[infinispan-commits] Infinispan SVN: r1359 - in trunk/server/memcached: src/main/java/org/infinispan/server/core/netty and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Jan 11 10:12:20 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-01-11 10:12:19 -0500 (Mon, 11 Jan 2010)
New Revision: 1359
Added:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
Removed:
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
Modified:
trunk/server/memcached/pom.xml
trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java
trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
Log:
[ISPN-173] (Build memcached server module) Fixed shutdown of memcached server and implemented basic command line options.
Modified: trunk/server/memcached/pom.xml
===================================================================
--- trunk/server/memcached/pom.xml 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/pom.xml 2010-01-11 15:12:19 UTC (rev 1359)
@@ -17,6 +17,7 @@
<properties>
<version.netty>3.1.5.GA</version.netty>
<version.spymemcached>2.4.2</version.spymemcached>
+ <version.gnu.getopt>1.0.13</version.gnu.getopt>
</properties>
<dependencies>
@@ -46,6 +47,12 @@
<version>${version.spymemcached}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>gnu-getopt</groupId>
+ <artifactId>getopt</artifactId>
+ <version>${version.gnu.getopt}</version>
+ </dependency>
</dependencies>
<repositories>
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyChannelUpstreamHandler.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -26,8 +26,10 @@
import org.infinispan.server.core.CommandHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
/**
* NettyUpstreamHandler.
@@ -38,12 +40,20 @@
@ChannelPipelineCoverage("one")
public class NettyChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
final CommandHandler handler;
+ final ChannelGroup group;
- public NettyChannelUpstreamHandler(CommandHandler handler) {
+ public NettyChannelUpstreamHandler(CommandHandler handler, ChannelGroup group) {
this.handler = handler;
+ this.group = group;
}
-
+
@Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ group.add(e.getChannel());
+ super.channelOpen(ctx, e);
+ }
+
+ @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
try {
handler.messageReceived(new NettyChannelHandlerContext(ctx), new NettyMessageEvent(e));
@@ -54,4 +64,6 @@
}
}
+
+
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -23,11 +23,19 @@
package org.infinispan.server.core.netty;
import java.net.SocketAddress;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import org.infinispan.server.core.CommandHandler;
+import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
/**
@@ -37,29 +45,46 @@
* @since 4.0
*/
public class NettyServerBootstrap implements org.infinispan.server.core.ServerBootstrap {
+ private static final Log log = LogFactory.getLog(NettyServerBootstrap.class);
final ChannelPipelineFactory pipeline;
final SocketAddress address;
+ final ChannelFactory factory;
+ final ChannelGroup serverChannels = new DefaultChannelGroup("memcached-channels");
+ final ChannelGroup acceptedChannels = new DefaultChannelGroup("memcached-accepted");
- public NettyServerBootstrap(ChannelPipelineFactory pipeline, SocketAddress address) {
- this.pipeline = pipeline;
+ public NettyServerBootstrap(CommandHandler commandHandler, NettyMemcachedDecoder decoder, SocketAddress address,
+ ExecutorService masterExecutor, ExecutorService workerExecutor, int workerThreads) {
+ NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler, acceptedChannels);
+ this.pipeline = new NettyChannelPipelineFactory(decoder, handler);
this.address = address;
+ if (workerThreads == 0) {
+ factory = new NioServerSocketChannelFactory(masterExecutor, workerExecutor);
+ } else {
+ factory = new NioServerSocketChannelFactory(masterExecutor, workerExecutor, workerThreads);
+ }
}
@Override
public void start() {
- ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(pipeline);
- bootstrap.bind(address);
+ Channel ch = bootstrap.bind(address);
+ serverChannels.add(ch);
}
@Override
public void stop() {
- // TODO how to close shutdown the nettty server?
+ serverChannels.close().awaitUninterruptibly();
+ ChannelGroupFuture future = acceptedChannels.close().awaitUninterruptibly();
+ if (!future.isCompleteSuccess()) {
+ log.warn("Channel group did not completely close");
+ for (Channel ch : future.getGroup()) {
+ if (ch.isBound()) {
+ log.warn(ch + " is still connected to " + ch.getRemoteAddress());
+ }
+ }
+ }
+ log.debug("Channel group completely closed, release external resources");
+ factory.releaseExternalResources();
}
-
- public static NettyServerBootstrap newNettyServerBootstrap(ChannelPipelineFactory pipeline, SocketAddress address) {
- return new NettyServerBootstrap(pipeline, address);
- }
-
}
Added: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/Main.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -0,0 +1,269 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import gnu.getopt.Getopt;
+import gnu.getopt.LongOpt;
+
+import org.infinispan.Version;
+
+/**
+ * Main.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class Main {
+
+ public static final String PROP_KEY_PORT = "infinispan.memcached.port";
+ public static final String PROP_KEY_HOST = "infinispan.memcached.host";
+ public static final String PROP_KEY_MASTER_THREADS = "infinispan.memcached.master.threads";
+ public static final String PROP_KEY_WORKER_THREADS = "infinispan.memcached.worker.threads";
+ public static final String PROP_KEY_CACHE_CONFIG = "infinispan.memcached.cache.config";
+
+ public static final int PORT_DEFAULT = 11211;
+ public static final String HOST_DEFAULT = "127.0.0.1";
+ public static final int MASTER_THREADS_DEFAULT = 0;
+ public static final int WORKER_THREADS_DEFAULT = 0;
+
+ /**
+ * Server properties. This object holds all of the required
+ * information to get the server up and running. Use System
+ * properties for defaults.
+ */
+ private Map<String, String> props = new HashMap<String, String>();
+
+ private TextServer server;
+
+ /**
+ * Explicit constructor.
+ */
+ public Main() {
+ // Set default properties
+ Properties sysProps = System.getProperties();
+ for (Object propName : sysProps.keySet()) {
+ String propNameString = (String) propName;
+ String propValue = (String) sysProps.get(propNameString);
+ props.put(propNameString, propValue);
+ }
+ }
+
+ public void boot(String[] args) throws Exception {
+ // First process the command line to pickup custom props/settings
+ processCommandLine(args);
+
+ int port = props.get(PROP_KEY_PORT) == null ? PORT_DEFAULT : Integer.parseInt(props.get(PROP_KEY_PORT));
+ String host = props.get(PROP_KEY_HOST) == null ? HOST_DEFAULT : props.get(PROP_KEY_HOST);
+ int masterThreads = props.get(PROP_KEY_MASTER_THREADS) == null ? MASTER_THREADS_DEFAULT : Integer.parseInt(props.get(PROP_KEY_MASTER_THREADS));
+ if (masterThreads < 0) {
+ throw new IllegalArgumentException("Master threads can't be lower than 0: " + masterThreads);
+ }
+ int workerThreads = props.get(PROP_KEY_WORKER_THREADS) == null ? WORKER_THREADS_DEFAULT : Integer.parseInt(props.get(PROP_KEY_WORKER_THREADS));
+ if (workerThreads < 0) {
+ throw new IllegalArgumentException("Worker threads can't be lower than 0: " + masterThreads);
+ }
+ String configFile = props.get(PROP_KEY_CACHE_CONFIG);
+
+ server = new TextServer(host, port, configFile, masterThreads, workerThreads);
+ // Make a shutdown hook
+ addShutdownHook(new ShutdownHook(server));
+
+ server.start();
+ }
+
+ private void processCommandLine(String[] args) throws Exception {
+ // set this from a system property or default to jboss
+ String programName = System.getProperty("program.name", "memcached");
+ String sopts = "-:hD:Vp:l:m:t:c";
+ LongOpt[] lopts =
+ {new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
+ new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
+ new LongOpt("port", LongOpt.REQUIRED_ARGUMENT, null, 'p'),
+ new LongOpt("host", LongOpt.REQUIRED_ARGUMENT, null, 'l'),
+ new LongOpt("master_threads", LongOpt.REQUIRED_ARGUMENT, null, 'm'),
+ new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
+ new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),};
+ Getopt getopt = new Getopt(programName, args, sopts, lopts);
+
+ int code;
+ while ((code = getopt.getopt()) != -1)
+ {
+ switch (code)
+ {
+ case ':' :
+ case '?' :
+ // for now both of these should exit with error status
+ System.exit(1);
+ break; // for completeness
+ case 1 :
+ // this will catch non-option arguments
+ // (which we don't currently care about)
+ System.err.println(programName + ": unused non-option argument: " + getopt.getOptarg());
+ break; // for completeness
+ case 'h' :
+ // show command line help
+ System.out.println("usage: " + programName + " [options]");
+ System.out.println();
+ System.out.println("options:");
+ System.out.println(" -h, --help Show this help message");
+ System.out.println(" -V, --version Show version information");
+ System.out.println(" -- Stop processing options");
+ System.out.println(" -p, --port=<num> TCP port number to listen on (default: 11211)");
+ System.out.println(" -l, --host=<host or ip> Interface to listen on (default: 127.0.0.1, localhost)");
+ System.out.println(" -m, --master_threads=<num> Number of threads accepting incoming connections. (default: unlimited while resources are available)");
+ System.out.println(" -t, --work_threads=<num> Number of threads processing incoming requests and sending responses. (default: unlimited while resources are available)");
+ System.out.println(" -c, --cache_config=<filename> Cache configuration file (default: creates caches with default values)");
+ System.out.println(" -D<name>[=<value>] Set a system property");
+ System.out.println();
+ System.exit(0);
+ break; // for completeness
+ case 'V' :
+ Version.printFullVersionInformation();
+ break;
+ case 'p' :
+ props.put(PROP_KEY_PORT, getopt.getOptarg());
+ break;
+ case 'l' :
+ props.put(PROP_KEY_HOST, getopt.getOptarg());
+ break;
+ case 'm' :
+ props.put(PROP_KEY_MASTER_THREADS, getopt.getOptarg());
+ break;
+ case 't' :
+ props.put(PROP_KEY_WORKER_THREADS, getopt.getOptarg());
+ break;
+ case 'c' :
+ props.put(PROP_KEY_CACHE_CONFIG, getopt.getOptarg());
+ break;
+ case 'D' :
+ // set a system property
+ String arg = getopt.getOptarg();
+ String name, value;
+ int i = arg.indexOf("=");
+ if (i == -1) {
+ name = arg;
+ value = "true";
+ } else {
+ name = arg.substring(0, i);
+ value = arg.substring(i + 1, arg.length());
+ }
+ System.setProperty(name, value);
+ break;
+ default :
+ // this should not happen,
+ // if it does throw an error so we know about it
+ throw new Error("unhandled option code: " + code);
+ }
+ }
+ }
+
+ public static void main(final String[] args) throws Exception {
+ Callable<Void> worker = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ Main main = new Main();
+ main.boot(args);
+ } catch (Exception e) {
+ System.err.println("Failed to boot JBoss:");
+ e.printStackTrace();
+ throw e;
+ }
+ return null;
+ }
+ };
+
+ Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, System.getProperty("program.name") + "-main");
+ t.setDaemon(true);
+ return t;
+ }
+ }).submit(worker);
+ }
+
+ /**
+ * Adds the specified shutdown hook
+ *
+ * @param shutdownHook
+ */
+ static void addShutdownHook(final Thread shutdownHook) {
+ AccessController.doPrivileged(new PrivilegedAction<Void>() {
+ public Void run() {
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * ShutdownHook
+ */
+ private static class ShutdownHook extends Thread {
+ private final TextServer server;
+
+ ShutdownHook(TextServer server) {
+ this.server = server;
+ }
+
+ @Override
+ public void run() {
+ // If we have a server
+ if (server != null) {
+ // Log out
+ System.out.println("Posting Shutdown Request to the memcached server...");
+ // Start in new thread to give positive feedback to requesting client of success.
+ Future<Void> f = Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ server.stop();
+ return null;
+ }
+ });
+
+ // Block until done
+ try {
+ f.get();
+ } catch (InterruptedException ie) {
+ // Clear the flag
+ Thread.interrupted();
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Exception encountered in shutting down the server", e);
+ }
+ }
+ }
+ }
+
+}
Deleted: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -1,76 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
- * individual contributors as indicated by the @author tags. See the
- * copyright.txt file in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.server.memcached;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.infinispan.Cache;
-import org.infinispan.server.core.ServerBootstrap;
-import org.infinispan.server.core.netty.NettyChannelPipelineFactory;
-import org.infinispan.server.core.netty.NettyChannelUpstreamHandler;
-import org.infinispan.server.core.netty.NettyServerBootstrap;
-import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
-import org.infinispan.server.core.InterceptorChain;
-import org.infinispan.server.memcached.commands.TextCommandHandler;
-import org.infinispan.server.memcached.interceptors.TextProtocolInterceptorChainFactory;
-
-/**
- * TextServer.
- *
- * @author Galder Zamarreño
- * @since 4.0
- */
-public class MemcachedTextServer {
- private final Cache cache;
- private final int port;
- private final ScheduledExecutorService scheduler;
- private ServerBootstrap bootstrap;
-
- public MemcachedTextServer(Cache cache, int port) {
- this.cache = cache;
- this.port = port;
- this.scheduler = Executors.newScheduledThreadPool(1);
- }
-
- public int getPort() {
- return port;
- }
-
- public void start() {
- InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
- NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
- TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
- NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler);
- NettyChannelPipelineFactory pipelineFactory = new NettyChannelPipelineFactory(decoder, handler);
- bootstrap = new NettyServerBootstrap(pipelineFactory, new InetSocketAddress(port));
- bootstrap.start();
- }
-
- public void stop() {
- bootstrap.stop();
- cache.stop();
- scheduler.shutdown();
- }
-}
Copied: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java (from rev 1357, trunk/server/memcached/src/main/java/org/infinispan/server/memcached/MemcachedTextServer.java)
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java (rev 0)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -0,0 +1,156 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.server.memcached;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.server.core.ServerBootstrap;
+import org.infinispan.server.core.netty.NettyServerBootstrap;
+import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
+import org.infinispan.server.core.InterceptorChain;
+import org.infinispan.server.memcached.commands.TextCommandHandler;
+import org.infinispan.server.memcached.interceptors.TextProtocolInterceptorChainFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * TextServer.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class TextServer {
+ private static final Log log = LogFactory.getLog(TextServer.class);
+ private final Cache cache;
+ private final String host;
+ private final int port;
+ private final int masterThreads;
+ private final int workerThreads;
+ private final ScheduledExecutorService scheduler;
+ private ServerBootstrap bootstrap;
+ private ExecutorService masterExecutor;
+ private ExecutorService workerExecutor;
+ private final AtomicInteger masterThreadNumber = new AtomicInteger(1);
+ private final AtomicInteger workerThreadNumber = new AtomicInteger(1);
+
+ public TextServer(String host, int port, String configFile, int masterThreads, int workerThreads) throws IOException {
+ this(host, port, configFile == null
+ ? new DefaultCacheManager().getCache()
+ : new DefaultCacheManager(configFile).getCache(),
+ masterThreads, masterThreads);
+ if (configFile == null) {
+ log.debug("Using cache manager using configuration defaults");
+ } else {
+ log.debug("Using cache manager configured from {0}", configFile);
+ }
+ }
+
+ public TextServer(String host, int port, Cache cache, int masterThreads, int workerThreads) throws IOException {
+ this.host = host;
+ this.port = port;
+ this.masterThreads = masterThreads;
+ this.workerThreads = workerThreads;
+ this.cache = cache;
+ this.scheduler = Executors.newScheduledThreadPool(1);
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void start() throws Exception {
+ InterceptorChain chain = TextProtocolInterceptorChainFactory.getInstance(cache).buildInterceptorChain();
+ NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
+ TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
+
+ ThreadFactory tf = new MemcachedThreadFactory(cache, ExecutorType.MASTER);
+ if (masterThreads == 0) {
+ log.debug("Configured unlimited threads for master thread pool");
+ masterExecutor = Executors.newCachedThreadPool(tf);
+ } else {
+ log.debug("Configured {0} threads for master thread pool", masterThreads);
+ masterExecutor = Executors.newFixedThreadPool(masterThreads, tf);
+ }
+
+ tf = new MemcachedThreadFactory(cache, ExecutorType.WORKER);
+ if (workerThreads == 0) {
+ log.debug("Configured unlimited threads for worker thread pool");
+ workerExecutor = Executors.newCachedThreadPool(tf);
+ } else {
+ log.debug("Configured {0} threads for worker thread pool", workerThreads);
+ workerExecutor = Executors.newFixedThreadPool(workerThreads, tf);
+ }
+
+ bootstrap = new NettyServerBootstrap(commandHandler, decoder, new InetSocketAddress(host, port),
+ masterExecutor, workerExecutor, workerThreads);
+ bootstrap.start();
+ log.info("Started Memcached text server bound to {0}:{1}", host, port);
+ }
+
+ public void stop() {
+ masterExecutor.shutdown();
+ workerExecutor.shutdown();
+ bootstrap.stop();
+ cache.stop();
+ scheduler.shutdown();
+ }
+
+ private static class MemcachedThreadFactory implements ThreadFactory {
+ final Cache cache;
+ final ExecutorType type;
+
+ MemcachedThreadFactory(Cache cache, ExecutorType type) {
+ this.cache = cache;
+ this.type = type;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, System.getProperty("program.name") + "-" + cache.getName() + '-' + type.toString().toLowerCase() + '-' + type.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ }
+
+ private static enum ExecutorType {
+ MASTER(1), WORKER(1);
+
+ final AtomicInteger threadCounter;
+
+ ExecutorType(int startIndex) {
+ this.threadCounter = new AtomicInteger(startIndex);
+ }
+
+ int getAndIncrement() {
+ return threadCounter.getAndIncrement();
+ }
+ }
+}
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/ClusterTest.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -39,6 +39,7 @@
import org.infinispan.config.Configuration;
import org.infinispan.server.memcached.test.MemcachedTestingUtil;
import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
/**
@@ -51,8 +52,8 @@
public class ClusterTest extends MultipleCacheManagersTest {
MemcachedClient client1;
MemcachedClient client2;
- MemcachedTextServer server1;
- MemcachedTextServer server2;
+ TextServer server1;
+ TextServer server2;
@Override
protected void createCacheManagers() throws Throwable {
@@ -66,6 +67,12 @@
client2 = createMemcachedClient(60000, server2.getPort());
}
+ @AfterClass(alwaysRun=true)
+ protected void destroyAfterClass() {
+ server1.stop();
+ server2.stop();
+ }
+
public void testReplicatedSet(Method m) throws Exception {
Future<Boolean> f = client1.set(k(m), 0, v(m));
assert f.get(120, TimeUnit.SECONDS);
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/FunctionalTest.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -39,6 +39,8 @@
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -52,8 +54,9 @@
*/
@Test(groups = "functional", testName = "server.memcached.FunctionalTest")
public class FunctionalTest extends SingleCacheManagerTest {
+ static final Log log = LogFactory.getLog(FunctionalTest.class);
MemcachedClient client;
- MemcachedTextServer server;
+ TextServer server;
@Override
protected CacheManager createCacheManager() throws Exception {
@@ -66,6 +69,7 @@
@AfterClass(alwaysRun=true)
protected void destroyAfterClass() {
+ log.debug("Test finished, close memcached server");
server.stop();
}
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/StatsTest.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -52,7 +52,7 @@
public class StatsTest extends SingleCacheManagerTest {
static final String JMX_DOMAIN = StatsTest.class.getSimpleName();
MemcachedClient client;
- MemcachedTextServer server;
+ TextServer server;
@Override
protected CacheManager createCacheManager() throws Exception {
Modified: trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java
===================================================================
--- trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java 2010-01-11 11:57:30 UTC (rev 1358)
+++ trunk/server/memcached/src/test/java/org/infinispan/server/memcached/test/MemcachedTestingUtil.java 2010-01-11 15:12:19 UTC (rev 1359)
@@ -29,7 +29,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
-import org.infinispan.server.memcached.MemcachedTextServer;
+import org.infinispan.server.memcached.TextServer;
import net.spy.memcached.DefaultConnectionFactory;
import net.spy.memcached.MemcachedClient;
@@ -41,6 +41,7 @@
* @since 4.0
*/
public class MemcachedTestingUtil {
+ private static final String HOST = "127.0.0.1";
private static final ThreadLocal<Integer> threadMemcachedPort = new ThreadLocal<Integer>() {
private final AtomicInteger uniqueAddr = new AtomicInteger(11211);
@@ -77,11 +78,11 @@
return new MemcachedClient(d, Arrays.asList(new InetSocketAddress[]{new InetSocketAddress(port)}));
}
- public static MemcachedTextServer createMemcachedTextServer(Cache cache) {
- return new MemcachedTextServer(cache, threadMemcachedPort.get());
+ public static TextServer createMemcachedTextServer(Cache cache) throws IOException {
+ return new TextServer(HOST, threadMemcachedPort.get().intValue(), cache, 0, 0);
}
- public static MemcachedTextServer createMemcachedTextServer(Cache cache, int port) {
- return new MemcachedTextServer(cache, port);
+ public static TextServer createMemcachedTextServer(Cache cache, int port) throws IOException {
+ return new TextServer(HOST, port, cache, 0, 0);
}
}
More information about the infinispan-commits
mailing list