[infinispan-commits] Infinispan SVN: r1359 - in trunk/server/memcached: src/main/java/org/infinispan/server/core/netty and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Jan 11 10:12:20 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-01-11 10:12:19 -0500 (Mon, 11 Jan 2010)
New Revision: 1359

Added:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
Removed:
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
Modified:
   trunk/server/memcached/pom.xml
   trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java
   trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
   trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
Log:
[ISPN-173] (Build memcached server module) Fixed shutdown of memcached server and implemented basic command line options.

Modified: trunk/server/memcached/pom.xml
===================================================================
--- trunk/server/memcached/pom.xml	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/pom.xml	2010-01-11 15:12:19 UTC (rev 1359)
@@ -17,6 +17,7 @@
    <properties>
       <version.netty>3.1.5.GA</version.netty>
       <version.spymemcached>2.4.2</version.spymemcached>
+      <version.gnu.getopt>1.0.13</version.gnu.getopt>
    </properties>
 
    <dependencies>
@@ -46,6 +47,12 @@
          <version>${version.spymemcached}</version>
          <scope>test</scope>
       </dependency>
+
+      <dependency>
+         <groupId>gnu-getopt</groupId>
+         <artifactId>getopt</artifactId>
+         <version>${version.gnu.getopt}</version>
+       </dependency>
    </dependencies>
 
   <repositories>

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -26,8 +26,10 @@
 import org.infinispan.server.core.CommandHandler;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
 
 /**
  * NettyUpstreamHandler.
@@ -38,12 +40,20 @@
 @ChannelPipelineCoverage("one")
 public class NettyChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
    final CommandHandler handler;
+   final ChannelGroup group;
 
-   public NettyChannelUpstreamHandler(CommandHandler handler) {
+   public NettyChannelUpstreamHandler(CommandHandler handler, ChannelGroup group) {
       this.handler = handler;
+      this.group = group;
    }
-   
+
    @Override
+   public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+      group.add(e.getChannel());
+      super.channelOpen(ctx, e);
+   }
+
+   @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
       try {
          handler.messageReceived(new NettyChannelHandlerContext(ctx), new NettyMessageEvent(e));
@@ -54,4 +64,6 @@
       }
    }
 
+
+
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -23,11 +23,19 @@
 package org.infinispan.server.core.netty;
 
 import java.net.SocketAddress;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
+import org.infinispan.server.core.CommandHandler;
+import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 
 /**
@@ -37,29 +45,46 @@
  * @since 4.0
  */
 public class NettyServerBootstrap implements org.infinispan.server.core.ServerBootstrap {
+   private static final Log log = LogFactory.getLog(NettyServerBootstrap.class);
    final ChannelPipelineFactory pipeline;
    final SocketAddress address;
+   final ChannelFactory factory;
+   final ChannelGroup serverChannels = new DefaultChannelGroup("memcached-channels");
+   final ChannelGroup acceptedChannels = new DefaultChannelGroup("memcached-accepted");
    
-   public NettyServerBootstrap(ChannelPipelineFactory pipeline, SocketAddress address) {
-      this.pipeline = pipeline;
+   public NettyServerBootstrap(CommandHandler commandHandler, NettyMemcachedDecoder decoder, SocketAddress address, 
+            ExecutorService masterExecutor, ExecutorService workerExecutor, int workerThreads) {
+      NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler, acceptedChannels);
+      this.pipeline = new NettyChannelPipelineFactory(decoder, handler);
       this.address = address;
+      if (workerThreads == 0) {
+         factory = new NioServerSocketChannelFactory(masterExecutor, workerExecutor);
+      } else {
+         factory = new NioServerSocketChannelFactory(masterExecutor, workerExecutor, workerThreads);
+      }
    }
 
    @Override
    public void start() {
-      ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
       ServerBootstrap bootstrap = new ServerBootstrap(factory);
       bootstrap.setPipelineFactory(pipeline);
-      bootstrap.bind(address);
+      Channel ch = bootstrap.bind(address);
+      serverChannels.add(ch);
    }
 
    @Override
    public void stop() {
-      // TODO how to close shutdown the nettty server?
+      serverChannels.close().awaitUninterruptibly();
+      ChannelGroupFuture future = acceptedChannels.close().awaitUninterruptibly();
+      if (!future.isCompleteSuccess()) {
+         log.warn("Channel group did not completely close");
+         for (Channel ch : future.getGroup()) {
+            if (ch.isBound()) {
+               log.warn(ch + " is still connected to " + ch.getRemoteAddress());
+            }
+         }
+      }
+      log.debug("Channel group completely closed, release external resources");
+      factory.releaseExternalResources();
    }
-
-   public static NettyServerBootstrap newNettyServerBootstrap(ChannelPipelineFactory pipeline, SocketAddress address) {
-      return new NettyServerBootstrap(pipeline, address);
-   }
-
 }

Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -0,0 +1,269 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import gnu.getopt.Getopt;
+import gnu.getopt.LongOpt;
+
+import org.infinispan.Version;
+
+/**
+ * Main.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class Main {
+
+   public static final String PROP_KEY_PORT = "infinispan.memcached.port";
+   public static final String PROP_KEY_HOST = "infinispan.memcached.host";
+   public static final String PROP_KEY_MASTER_THREADS = "infinispan.memcached.master.threads";
+   public static final String PROP_KEY_WORKER_THREADS = "infinispan.memcached.worker.threads";
+   public static final String PROP_KEY_CACHE_CONFIG = "infinispan.memcached.cache.config";
+
+   public static final int PORT_DEFAULT = 11211;
+   public static final String HOST_DEFAULT = "127.0.0.1";
+   public static final int MASTER_THREADS_DEFAULT = 0;
+   public static final int WORKER_THREADS_DEFAULT = 0;
+
+   /**
+    * Server properties.  This object holds all of the required
+    * information to get the server up and running. Use System
+    * properties for defaults.
+    */
+   private Map<String, String> props = new HashMap<String, String>();
+
+   private TextServer server;
+
+   /**
+    * Explicit constructor.
+    */
+   public Main() {
+      // Set default properties
+      Properties sysProps = System.getProperties();
+      for (Object propName : sysProps.keySet()) {
+         String propNameString = (String) propName;
+         String propValue = (String) sysProps.get(propNameString);
+         props.put(propNameString, propValue);
+      }
+   }
+
+   public void boot(String[] args) throws Exception {
+      // First process the command line to pickup custom props/settings
+      processCommandLine(args);
+
+      int port = props.get(PROP_KEY_PORT) == null ? PORT_DEFAULT : Integer.parseInt(props.get(PROP_KEY_PORT));
+      String host = props.get(PROP_KEY_HOST) == null ? HOST_DEFAULT : props.get(PROP_KEY_HOST);
+      int masterThreads = props.get(PROP_KEY_MASTER_THREADS) == null ? MASTER_THREADS_DEFAULT : Integer.parseInt(props.get(PROP_KEY_MASTER_THREADS));
+      if (masterThreads < 0) {
+         throw new IllegalArgumentException("Master threads can't be lower than 0: " + masterThreads);
+      }
+      int workerThreads = props.get(PROP_KEY_WORKER_THREADS) == null ? WORKER_THREADS_DEFAULT : Integer.parseInt(props.get(PROP_KEY_WORKER_THREADS));
+      if (workerThreads < 0) {
+         throw new IllegalArgumentException("Worker threads can't be lower than 0: " + masterThreads);
+      }
+      String configFile = props.get(PROP_KEY_CACHE_CONFIG);
+
+      server = new TextServer(host, port, configFile, masterThreads, workerThreads);
+      // Make a shutdown hook
+      addShutdownHook(new ShutdownHook(server));
+
+      server.start();
+   }
+
+   private void processCommandLine(String[] args) throws Exception {
+      // set this from a system property or default to jboss
+      String programName = System.getProperty("program.name", "memcached");
+      String sopts = "-:hD:Vp:l:m:t:c";
+      LongOpt[] lopts =
+      {new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
+            new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
+            new LongOpt("port", LongOpt.REQUIRED_ARGUMENT, null, 'p'),
+            new LongOpt("host", LongOpt.REQUIRED_ARGUMENT, null, 'l'),
+            new LongOpt("master_threads", LongOpt.REQUIRED_ARGUMENT, null, 'm'),
+            new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
+            new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),};
+      Getopt getopt = new Getopt(programName, args, sopts, lopts);
+
+      int code;
+      while ((code = getopt.getopt()) != -1)
+      {
+         switch (code)
+         {
+            case ':' :
+            case '?' :
+               // for now both of these should exit with error status
+               System.exit(1);
+               break; // for completeness
+            case 1 :
+               // this will catch non-option arguments
+               // (which we don't currently care about)
+               System.err.println(programName + ": unused non-option argument: " + getopt.getOptarg());
+               break; // for completeness
+            case 'h' :
+               // show command line help
+               System.out.println("usage: " + programName + " [options]");
+               System.out.println();
+               System.out.println("options:");
+               System.out.println("    -h, --help                     Show this help message");
+               System.out.println("    -V, --version                  Show version information");
+               System.out.println("    --                             Stop processing options");
+               System.out.println("    -p, --port=<num>               TCP port number to listen on (default: 11211)");
+               System.out.println("    -l, --host=<host or ip>        Interface to listen on (default: 127.0.0.1, localhost)");
+               System.out.println("    -m, --master_threads=<num>     Number of threads accepting incoming connections. (default: unlimited while resources are available)");
+               System.out.println("    -t, --work_threads=<num>       Number of threads processing incoming requests and sending responses. (default: unlimited while resources are available)");
+               System.out.println("    -c, --cache_config=<filename>  Cache configuration file (default: creates caches with default values)");
+               System.out.println("    -D<name>[=<value>]             Set a system property");
+               System.out.println();
+               System.exit(0);
+               break; // for completeness
+            case 'V' :
+               Version.printFullVersionInformation();
+               break;
+            case 'p' :
+               props.put(PROP_KEY_PORT, getopt.getOptarg());
+               break;
+            case 'l' :
+               props.put(PROP_KEY_HOST, getopt.getOptarg());
+               break;
+            case 'm' :
+               props.put(PROP_KEY_MASTER_THREADS, getopt.getOptarg());
+               break;
+            case 't' :
+               props.put(PROP_KEY_WORKER_THREADS, getopt.getOptarg());
+               break;
+            case 'c' :
+               props.put(PROP_KEY_CACHE_CONFIG, getopt.getOptarg());
+               break;
+            case 'D' :
+               // set a system property
+               String arg = getopt.getOptarg();
+               String name, value;
+               int i = arg.indexOf("=");
+               if (i == -1) {
+                  name = arg;
+                  value = "true";
+               } else {
+                  name = arg.substring(0, i);
+                  value = arg.substring(i + 1, arg.length());
+               }
+               System.setProperty(name, value);
+               break;
+            default :
+               // this should not happen,
+               // if it does throw an error so we know about it
+               throw new Error("unhandled option code: " + code);
+         }
+      }
+   }
+
+   public static void main(final String[] args) throws Exception {
+      Callable<Void> worker = new Callable<Void>() {
+         @Override
+         public Void call() throws Exception {
+            try {
+               Main main = new Main();
+               main.boot(args);
+            } catch (Exception e) {
+               System.err.println("Failed to boot JBoss:");
+               e.printStackTrace();
+               throw e;
+            }
+            return null;
+         }
+      };
+
+      Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+         @Override
+         public Thread newThread(Runnable r) {
+            Thread t = new Thread(r, System.getProperty("program.name") + "-main");
+            t.setDaemon(true);
+            return t;
+         }
+      }).submit(worker);
+   }
+
+   /**
+    * Adds the specified shutdown hook
+    * 
+    * @param shutdownHook
+    */
+   static void addShutdownHook(final Thread shutdownHook) {
+      AccessController.doPrivileged(new PrivilegedAction<Void>() {
+         public Void run() {
+            Runtime.getRuntime().addShutdownHook(shutdownHook);
+            return null;
+         }
+      });
+   }
+
+   /**
+    * ShutdownHook
+    */
+   private static class ShutdownHook extends Thread {
+      private final TextServer server;
+
+      ShutdownHook(TextServer server) {
+         this.server = server;
+      }
+
+      @Override
+      public void run() {
+         // If we have a server
+         if (server != null) {
+            // Log out
+            System.out.println("Posting Shutdown Request to the memcached server...");
+            // Start in new thread to give positive feedback to requesting client of success.
+            Future<Void> f = Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+               @Override
+               public Void call() throws Exception {
+                  server.stop();
+                  return null;
+               }
+            });
+
+            // Block until done
+            try {
+               f.get();
+            } catch (InterruptedException ie) {
+               // Clear the flag
+               Thread.interrupted();
+            } catch (ExecutionException e) {
+               throw new RuntimeException("Exception encountered in shutting down the server", e);
+            }
+         }
+      }
+   }
+
+}

Deleted: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -1,76 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
- * individual contributors as indicated by the @author tags. See the
- * copyright.txt file in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.server.memcached;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.infinispan.Cache;
-import org.infinispan.server.core.ServerBootstrap;
-import org.infinispan.server.core.netty.NettyChannelPipelineFactory;
-import org.infinispan.server.core.netty.NettyChannelUpstreamHandler;
-import org.infinispan.server.core.netty.NettyServerBootstrap;
-import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
-import org.infinispan.server.core.InterceptorChain;
-import org.infinispan.server.memcached.commands.TextCommandHandler;
-import org.infinispan.server.memcached.interceptors.TextProtocolInterceptorChainFactory;
-
-/**
- * TextServer.
- * 
- * @author Galder Zamarreño
- * @since 4.0
- */
-public class MemcachedTextServer {
-   private final Cache cache;
-   private final int port;
-   private final ScheduledExecutorService scheduler;
-   private ServerBootstrap bootstrap;
-   
-   public MemcachedTextServer(Cache cache, int port) {
-      this.cache = cache;
-      this.port = port;
-      this.scheduler = Executors.newScheduledThreadPool(1);
-   }
-
-   public int getPort() {
-      return port;
-   }
-
-   public void start() {
-      InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
-      NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
-      TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
-      NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler);
-      NettyChannelPipelineFactory pipelineFactory = new NettyChannelPipelineFactory(decoder, handler);
-      bootstrap = new NettyServerBootstrap(pipelineFactory, new InetSocketAddress(port));
-      bootstrap.start();
-   }
-
-   public void stop() {
-      bootstrap.stop();
-      cache.stop();
-      scheduler.shutdown();
-   }
-}

Copied: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java (from rev 1357, trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java)
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	                        (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -0,0 +1,156 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.server.core.ServerBootstrap;
+import org.infinispan.server.core.netty.NettyServerBootstrap;
+import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
+import org.infinispan.server.core.InterceptorChain;
+import org.infinispan.server.memcached.commands.TextCommandHandler;
+import org.infinispan.server.memcached.interceptors.TextProtocolInterceptorChainFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * TextServer.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class TextServer {
+   private static final Log log = LogFactory.getLog(TextServer.class);
+   private final Cache cache;
+   private final String host;
+   private final int port;
+   private final int masterThreads;
+   private final int workerThreads;
+   private final ScheduledExecutorService scheduler;
+   private ServerBootstrap bootstrap;
+   private ExecutorService masterExecutor;
+   private ExecutorService workerExecutor;
+   private final AtomicInteger masterThreadNumber = new AtomicInteger(1);
+   private final AtomicInteger workerThreadNumber = new AtomicInteger(1);
+
+   public TextServer(String host, int port, String configFile, int masterThreads, int workerThreads) throws IOException {
+      this(host, port, configFile == null 
+               ? new DefaultCacheManager().getCache() 
+               : new DefaultCacheManager(configFile).getCache(), 
+               masterThreads, masterThreads);
+      if (configFile == null) {
+         log.debug("Using cache manager using configuration defaults");
+      } else {
+         log.debug("Using cache manager configured from {0}", configFile);
+      }
+   }
+
+   public TextServer(String host, int port, Cache cache, int masterThreads, int workerThreads) throws IOException {
+      this.host = host;
+      this.port = port;
+      this.masterThreads = masterThreads;
+      this.workerThreads = workerThreads;
+      this.cache = cache;
+      this.scheduler = Executors.newScheduledThreadPool(1);
+   }
+
+   public int getPort() {
+      return port;
+   }
+
+   public void start() throws Exception {
+      InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
+      NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
+      TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
+
+      ThreadFactory tf = new MemcachedThreadFactory(cache, ExecutorType.MASTER);
+      if (masterThreads == 0) {
+         log.debug("Configured unlimited threads for master thread pool");
+         masterExecutor = Executors.newCachedThreadPool(tf);
+      } else {
+         log.debug("Configured {0} threads for master thread pool", masterThreads);
+         masterExecutor = Executors.newFixedThreadPool(masterThreads, tf);
+      }
+
+      tf = new MemcachedThreadFactory(cache, ExecutorType.WORKER);
+      if (workerThreads == 0) {
+         log.debug("Configured unlimited threads for worker thread pool");
+         workerExecutor = Executors.newCachedThreadPool(tf);
+      } else {
+         log.debug("Configured {0} threads for worker thread pool", workerThreads);
+         workerExecutor = Executors.newFixedThreadPool(workerThreads, tf);
+      }
+
+      bootstrap = new NettyServerBootstrap(commandHandler, decoder, new InetSocketAddress(host, port), 
+               masterExecutor, workerExecutor, workerThreads);
+      bootstrap.start();
+      log.info("Started Memcached text server bound to {0}:{1}", host, port);
+   }
+
+   public void stop() {
+      masterExecutor.shutdown();
+      workerExecutor.shutdown();
+      bootstrap.stop();
+      cache.stop();
+      scheduler.shutdown();
+   }
+
+   private static class MemcachedThreadFactory implements ThreadFactory {
+      final Cache cache;
+      final ExecutorType type;
+
+      MemcachedThreadFactory(Cache cache, ExecutorType type) {
+         this.cache = cache;
+         this.type = type;
+      }
+
+      @Override
+      public Thread newThread(Runnable r) {
+         Thread t = new Thread(r, System.getProperty("program.name") + "-" + cache.getName() + '-' + type.toString().toLowerCase() + '-' + type.getAndIncrement());
+         t.setDaemon(true);
+         return t;
+      }
+   }
+
+   private static enum ExecutorType {
+      MASTER(1), WORKER(1);
+
+      final AtomicInteger threadCounter;
+
+      ExecutorType(int startIndex) {
+         this.threadCounter = new AtomicInteger(startIndex);
+      }
+
+      int getAndIncrement() {
+         return threadCounter.getAndIncrement();
+      }
+   }
+}

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -39,6 +39,7 @@
 import org.infinispan.config.Configuration;
 import org.infinispan.server.memcached.test.MemcachedTestingUtil;
 import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
 /**
@@ -51,8 +52,8 @@
 public class ClusterTest extends MultipleCacheManagersTest {
    MemcachedClient client1;
    MemcachedClient client2;
-   MemcachedTextServer server1;
-   MemcachedTextServer server2;
+   TextServer server1;
+   TextServer server2;
    
    @Override
    protected void createCacheManagers() throws Throwable {
@@ -66,6 +67,12 @@
       client2 = createMemcachedClient(60000, server2.getPort());
    }
 
+   @AfterClass(alwaysRun=true)
+   protected void destroyAfterClass() {
+      server1.stop();
+      server2.stop();
+   }
+
    public void testReplicatedSet(Method m) throws Exception {
       Future<Boolean> f = client1.set(k(m), 0, v(m));
       assert f.get(120, TimeUnit.SECONDS);

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -39,6 +39,8 @@
 import org.infinispan.test.SingleCacheManagerTest;
 import org.infinispan.test.TestingUtil;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
@@ -52,8 +54,9 @@
  */
 @Test(groups = "functional", testName = "server.memcached.FunctionalTest")
 public class FunctionalTest extends SingleCacheManagerTest {
+   static final Log log = LogFactory.getLog(FunctionalTest.class);
    MemcachedClient client;
-   MemcachedTextServer server;
+   TextServer server;
 
    @Override
    protected CacheManager createCacheManager() throws Exception {
@@ -66,6 +69,7 @@
 
    @AfterClass(alwaysRun=true)
    protected void destroyAfterClass() {
+      log.debug("Test finished, close memcached server");
       server.stop();
    }
 

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -52,7 +52,7 @@
 public class StatsTest extends SingleCacheManagerTest {
    static final String JMX_DOMAIN = StatsTest.class.getSimpleName();
    MemcachedClient client;
-   MemcachedTextServer server;
+   TextServer server;
 
    @Override
    protected CacheManager createCacheManager() throws Exception {

Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java	2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java	2010-01-11 15:12:19 UTC (rev 1359)
@@ -29,7 +29,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.infinispan.Cache;
-import org.infinispan.server.memcached.MemcachedTextServer;
+import org.infinispan.server.memcached.TextServer;
 
 import net.spy.memcached.DefaultConnectionFactory;
 import net.spy.memcached.MemcachedClient;
@@ -41,6 +41,7 @@
  * @since 4.0
  */
 public class MemcachedTestingUtil {
+   private static final String HOST = "127.0.0.1";
 
    private static final ThreadLocal<Integer> threadMemcachedPort = new ThreadLocal<Integer>() {
       private final AtomicInteger uniqueAddr = new AtomicInteger(11211);
@@ -77,11 +78,11 @@
       return new MemcachedClient(d, Arrays.asList(new InetSocketAddress[]{new InetSocketAddress(port)}));
    }
 
-   public static MemcachedTextServer createMemcachedTextServer(Cache cache) {
-      return new MemcachedTextServer(cache, threadMemcachedPort.get());
+   public static TextServer createMemcachedTextServer(Cache cache) throws IOException {
+      return new TextServer(HOST, threadMemcachedPort.get().intValue(), cache, 0, 0);
    }
 
-   public static MemcachedTextServer createMemcachedTextServer(Cache cache, int port) {
-      return new MemcachedTextServer(cache, port);
+   public static TextServer createMemcachedTextServer(Cache cache, int port) throws IOException {
+      return new TextServer(HOST, port, cache, 0, 0);
    }
 }



More information about the infinispan-commits mailing list