[infinispan-commits] Infinispan SVN: r1886 - in trunk: client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport and 8 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Jun 2 12:21:07 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-06-02 12:21:05 -0400 (Wed, 02 Jun 2010)
New Revision: 1886
Modified:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Log:
[ISPN-476] (Enable tcp no delay by default and make it configurable in servers) Implemented flag switch. Tidied up ProtocolServer interface to take a Properties object instead of individual parameters as it makes it more easily extensible.
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-06-02 16:21:05 UTC (rev 1886)
@@ -58,11 +58,14 @@
* servers according to this strategy. Defaults to {@link org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy}
* </li>
* <li>
- * force-return-value - weather ot not to implicitelly {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} for all calls.
+ * force-return-value - weather or not to implicitly {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} for all calls.
* Defaults to false.
* </li>
+ * <li>
+ * tcp-no-delay - TCP no delay flag switch. Defaults to true.
+ * </li>
* <br/>
- * <i>bellow is connection pooling config</i>:
+ * <i>below is connection pooling config</i>:
* <p/>
* <li>maxActive - controls the maximum number of connections per server that are allocated (checked out to client threads, or idle in
* the pool) at one time. When non-positive, there is no limit to the number of connections per server. When maxActive
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java 2010-06-02 16:21:05 UTC (rev 1886)
@@ -30,4 +30,6 @@
void updateHashFunction(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, short hashFunctionVersion, int hashSpace);
Transport getTransport(byte[] key);
+
+ boolean isTcpNoDelay();
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java 2010-06-02 16:21:05 UTC (rev 1886)
@@ -40,6 +40,7 @@
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new HotRodClientPipelaneFactory(decoder));
+ bootstrap.setOption("tcpNoDelay", getTransportFactory().isTcpNoDelay());
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(serverAddress);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-06-02 16:21:05 UTC (rev 1886)
@@ -21,11 +21,14 @@
private InetSocketAddress serverAddr;
private Collection<InetSocketAddress> serverAddresses;
+ private boolean tcpNoDelay;
@Override
public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId) {
this.serverAddresses = staticConfiguredServers;
serverAddr = serverAddresses.iterator().next();
+ tcpNoDelay = Boolean.valueOf(props.getProperty("tcp-no-delay", "true"));
+ if (log.isDebugEnabled()) log.debug("TCP no delay flag value is: {0}", tcpNoDelay);
}
@Override
@@ -58,4 +61,9 @@
public Transport getTransport(byte[] key) {
return getTransport();
}
+
+ @Override
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-06-02 16:21:05 UTC (rev 1886)
@@ -33,6 +33,7 @@
try {
SocketChannel socketChannel = SocketChannel.open(serverAddress);
socket = socketChannel.socket();
+ socket.setTcpNoDelay(transportFactory.isTcpNoDelay());
} catch (IOException e) {
String message = "Could not connect to server: " + serverAddress;
log.error(message, e);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-06-02 16:21:05 UTC (rev 1886)
@@ -36,6 +36,7 @@
private volatile RequestBalancingStrategy balancer;
private volatile Collection<InetSocketAddress> servers;
private volatile ConsistentHash consistentHash;
+ private volatile boolean tcpNoDelay;
private final ConsistentHashFactory hashFactory = new ConsistentHashFactory();
@Override
@@ -44,6 +45,8 @@
servers = staticConfiguredServers;
String balancerClass = props.getProperty("request-balancing-strategy", RoundRobinBalancingStrategy.class.getName());
balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
+ tcpNoDelay = Boolean.valueOf(props.getProperty("tcp-no-delay", "true"));
+ if (log.isDebugEnabled()) log.debug("TCP no delay flag value is: {0}", tcpNoDelay);
PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TransportObjectFactory(this, topologyId), props);
connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
balancer.setServers(servers);
@@ -174,4 +177,8 @@
public ConsistentHash getConsistentHash() {
return consistentHash;
}
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java 2010-06-02 16:21:05 UTC (rev 1886)
@@ -9,6 +9,8 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
/**
@@ -56,8 +58,16 @@
int port = hotrodServer.getPort();
hotrodServer.stop();
- hotrodServer.start("localhost", port, cacheManager, 2, 2, 20000);
+ Properties properties = new Properties();
+ properties.setProperty("infinispan.server.host", "localhost");
+ properties.setProperty("infinispan.server.port", Integer.toString(port));
+ properties.setProperty("infinispan.server.master.threads", "2");
+ properties.setProperty("infinispan.server.worker.threads", "2");
+ properties.setProperty("infinispan.server.idle.timeout", "20000");
+ properties.setProperty("infinispan.server.tcp.no.delay", "true");
+ hotrodServer.start(properties, cacheManager);
+
Thread.sleep(3000);
assert defaultRemote.get("k").equals("v");
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-06-02 16:21:05 UTC (rev 1886)
@@ -4,7 +4,9 @@
import transport.netty.{EncoderAdapter, NettyTransport}
import transport.Transport
import org.infinispan.server.core.VersionGenerator._
-import org.infinispan.manager.{EmbeddedCacheManager, CacheManager}
+import org.infinispan.manager.EmbeddedCacheManager
+import org.infinispan.server.core.Main._
+import java.util.Properties
/**
* // TODO: Document this
@@ -19,12 +21,14 @@
private var transport: Transport = _
private var cacheManager: EmbeddedCacheManager = _
- override def start(host: String, port: Int, cacheManager: EmbeddedCacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int) {
- this.host = host
- this.port = port
- this.masterThreads = masterThreads
- this.workerThreads = workerThreads
+ override def start(properties: Properties, cacheManager: EmbeddedCacheManager) {
+ this.host = properties.getProperty(PROP_KEY_HOST)
+ this.port = properties.getProperty(PROP_KEY_PORT).toInt
+ this.masterThreads = properties.getProperty(PROP_KEY_MASTER_THREADS).toInt
+ this.workerThreads = properties.getProperty(PROP_KEY_WORKER_THREADS).toInt
this.cacheManager = cacheManager
+ val idleTimeout = properties.getProperty(PROP_KEY_IDLE_TIMEOUT).toInt
+ val tcpNoDelay = properties.getProperty(PROP_KEY_TCP_NO_DELAY).toBoolean
// Register rank calculator before starting any cache so that we can capture all view changes
cacheManager.addListener(getRankCalculatorListener)
@@ -33,7 +37,8 @@
val address = new InetSocketAddress(host, port)
val encoder = getEncoder
val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
- transport = new NettyTransport(this, nettyEncoder, address, masterThreads, workerThreads, idleTimeout, threadNamePrefix)
+ transport = new NettyTransport(this, nettyEncoder, address, masterThreads, workerThreads, idleTimeout,
+ threadNamePrefix, tcpNoDelay)
transport.start
}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala 2010-06-02 16:21:05 UTC (rev 1886)
@@ -1,8 +1,8 @@
package org.infinispan.server.core
import scala.collection.JavaConversions._
-import collection.mutable.HashMap
import scala.collection.mutable
+import scala.collection.immutable
import org.infinispan.util.Util
import java.io.IOException
import java.security.{PrivilegedAction, AccessController}
@@ -10,6 +10,7 @@
import gnu.getopt.{Getopt, LongOpt}
import org.infinispan.Version
import org.infinispan.manager.{CacheManager, DefaultCacheManager}
+import java.util.Properties
/**
* Main class for server startup.
@@ -26,23 +27,27 @@
val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache.config"
val PROP_KEY_PROTOCOL = "infinispan.server.protocol"
val PROP_KEY_IDLE_TIMEOUT = "infinispan.server.idle.timeout"
+ val PROP_KEY_TCP_NO_DELAY = "infinispan.server.tcp.no.delay"
val PORT_DEFAULT = 11211
val HOST_DEFAULT = "127.0.0.1"
- val MASTER_THREADS_DEFAULT = 0
- val WORKER_THREADS_DEFAULT = 0
- val IDLE_TIMEOUT_DEFAULT = -1
+ val MASTER_THREADS_DEFAULT = "0"
+ val WORKER_THREADS_DEFAULT = "0"
+ val IDLE_TIMEOUT_DEFAULT = "-1"
+ val TCP_NO_DELAY_DEFAULT = "true"
/**
* Server properties. This object holds all of the required
* information to get the server up and running. Use System
* properties for defaults.
*/
- private val props: mutable.Map[String, String] = {
+ private val props: Properties = {
// Set default properties
- val properties = new HashMap[String, String]
- val sysProps = System.getProperties
- for (property <- asIterator(sysProps.iterator))
- properties.put(property._1, property._2)
+ val properties = new Properties(System.getProperties)
+ properties.setProperty(PROP_KEY_HOST, HOST_DEFAULT)
+ properties.setProperty(PROP_KEY_MASTER_THREADS, MASTER_THREADS_DEFAULT)
+ properties.setProperty(PROP_KEY_WORKER_THREADS, WORKER_THREADS_DEFAULT)
+ properties.setProperty(PROP_KEY_IDLE_TIMEOUT, IDLE_TIMEOUT_DEFAULT)
+ properties.setProperty(PROP_KEY_TCP_NO_DELAY, TCP_NO_DELAY_DEFAULT)
properties
}
@@ -79,52 +84,65 @@
// First process the command line to pickup custom props/settings
processCommandLine(args)
- val host = if (props.get(PROP_KEY_HOST) == None) HOST_DEFAULT else props.get(PROP_KEY_HOST).get
- val masterThreads = if (props.get(PROP_KEY_MASTER_THREADS) == None) MASTER_THREADS_DEFAULT else props.get(PROP_KEY_MASTER_THREADS).get.toInt
+ val properties = new Properties
+
+ val masterThreads = props.getProperty(PROP_KEY_MASTER_THREADS).toInt
if (masterThreads < 0)
throw new IllegalArgumentException("Master threads can't be lower than 0: " + masterThreads)
-
- val workerThreads = if (props.get(PROP_KEY_WORKER_THREADS) == None) WORKER_THREADS_DEFAULT else props.get(PROP_KEY_WORKER_THREADS).get.toInt
+
+ val workerThreads = props.getProperty(PROP_KEY_WORKER_THREADS).toInt
if (workerThreads < 0)
throw new IllegalArgumentException("Worker threads can't be lower than 0: " + masterThreads)
- val configFile = props.get(PROP_KEY_CACHE_CONFIG)
- var protocol = props.get(PROP_KEY_PROTOCOL)
- if (protocol == None) {
+ var protocol = props.getProperty(PROP_KEY_PROTOCOL)
+ if (protocol == null) {
System.err.println("ERROR: Please indicate protocol to run with -r parameter")
showAndExit
}
- val idleTimeout = if (props.get(PROP_KEY_IDLE_TIMEOUT) == None) IDLE_TIMEOUT_DEFAULT else props.get(PROP_KEY_IDLE_TIMEOUT).get.toInt
+
+ val idleTimeout = props.getProperty(PROP_KEY_IDLE_TIMEOUT).toInt
if (idleTimeout < -1)
throw new IllegalArgumentException("Idle timeout can't be lower than -1: " + idleTimeout)
+ val tcpNoDelay = props.getProperty(PROP_KEY_TCP_NO_DELAY)
+ try {
+ tcpNoDelay.toBoolean
+ } catch {
+ case n: NumberFormatException => {
+ throw new IllegalArgumentException("TCP no delay flag switch must be a boolean: " + tcpNoDelay)
+ }
+ }
+
// TODO: move class name and protocol number to a resource file under the corresponding project
- val clazz = protocol.get match {
+ val clazz = protocol match {
case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
case "hotrod" => "org.infinispan.server.hotrod.HotRodServer"
case "websocket" => "org.infinispan.server.websocket.WebSocketServer"
}
+ val server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]
+
val port = {
- if (props.get(PROP_KEY_PORT) == None) {
- protocol.get match {
+ if (props.getProperty(PROP_KEY_PORT) == null) {
+ protocol match {
case "memcached" => 11211
case "hotrod" => 11311
case "websocket" => 8181
}
} else {
- props.get(PROP_KEY_PORT).get.toInt
+ props.getProperty(PROP_KEY_PORT).toInt
}
}
+ props.setProperty(PROP_KEY_PORT, port.toString)
- var server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]
- val cacheManager = if (configFile == None) new DefaultCacheManager else new DefaultCacheManager(configFile.get)
+ val configFile = props.getProperty(PROP_KEY_CACHE_CONFIG)
+ val cacheManager = if (configFile == null) new DefaultCacheManager else new DefaultCacheManager(configFile)
addShutdownHook(new ShutdownHook(server, cacheManager))
- server.start(host, port, cacheManager, masterThreads, workerThreads, idleTimeout)
+ server.start(props, cacheManager)
}
private def processCommandLine(args: Array[String]) {
programName = System.getProperty("program.name", "startServer")
- var sopts = "-:hD:Vp:l:m:t:c:r:i:"
+ var sopts = "-:hD:Vp:l:m:t:c:r:i:n:"
var lopts = Array(
new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
@@ -134,7 +152,8 @@
new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),
new LongOpt("protocol", LongOpt.REQUIRED_ARGUMENT, null, 'r'),
- new LongOpt("idle_timeout", LongOpt.REQUIRED_ARGUMENT, null, 'i'))
+ new LongOpt("idle_timeout", LongOpt.REQUIRED_ARGUMENT, null, 'i'),
+ new LongOpt("tcp_no_delay", LongOpt.REQUIRED_ARGUMENT, null, 'n'))
var getopt = new Getopt(programName, args, sopts, lopts)
var code: Int = 0
while ((({code = getopt.getopt; code})) != -1) {
@@ -146,13 +165,14 @@
Version.printFullVersionInformation
System.exit(0)
}
- case 'p' => props.put(PROP_KEY_PORT, getopt.getOptarg)
- case 'l' => props.put(PROP_KEY_HOST, getopt.getOptarg)
- case 'm' => props.put(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
- case 't' => props.put(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
- case 'c' => props.put(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
- case 'r' => props.put(PROP_KEY_PROTOCOL, getopt.getOptarg)
- case 'i' => props.put(PROP_KEY_IDLE_TIMEOUT, getopt.getOptarg)
+ case 'p' => props.setProperty(PROP_KEY_PORT, getopt.getOptarg)
+ case 'l' => props.setProperty(PROP_KEY_HOST, getopt.getOptarg)
+ case 'm' => props.setProperty(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
+ case 't' => props.setProperty(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
+ case 'c' => props.setProperty(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
+ case 'r' => props.setProperty(PROP_KEY_PROTOCOL, getopt.getOptarg)
+ case 'i' => props.setProperty(PROP_KEY_IDLE_TIMEOUT, getopt.getOptarg)
+ case 'n' => props.setProperty(PROP_KEY_TCP_NO_DELAY, getopt.getOptarg)
case 'D' => {
val arg = getopt.getOptarg
var name = ""
@@ -208,6 +228,8 @@
println(" If no new messages have been read within this time, the server disconnects the channel.")
println(" Passing -1 disables idle timeout.")
println
+ println(" -n, --tcp_no_delay=[true|false] TCP no delay flag switch (default: true).")
+ println
println(" -D<name>[=<value>] Set a system property")
println
System.exit(0)
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala 2010-06-02 16:21:05 UTC (rev 1886)
@@ -2,6 +2,7 @@
import transport.{Decoder, Encoder}
import org.infinispan.manager.{EmbeddedCacheManager}
+import java.util.Properties
/**
* // TODO: Document this
@@ -9,8 +10,15 @@
* @since 4.1
*/
trait ProtocolServer {
- def start(host: String, port: Int, cacheManager: EmbeddedCacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int)
+
+ /**
+ * Using Properties here instead of a Map in order to make it easier for java code to call in.
+ */
+ def start(properties: Properties, cacheManager: EmbeddedCacheManager)
+
def stop
+
def getEncoder: Encoder
+
def getDecoder: Decoder
}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-06-02 16:21:05 UTC (rev 1886)
@@ -18,7 +18,7 @@
*/
class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
address: SocketAddress, masterThreads: Int, workerThreads: Int,
- idleTimeout: Int, threadNamePrefix: String) extends Transport {
+ idleTimeout: Int, threadNamePrefix: String, tcpNoDelay: Boolean) extends Transport {
import NettyTransport._
private val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
@@ -70,6 +70,7 @@
})
val bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(pipeline);
+ bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
val ch = bootstrap.bind(address);
serverChannels.add(ch);
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-06-02 16:21:05 UTC (rev 1886)
@@ -9,11 +9,11 @@
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent
import scala.collection.JavaConversions._
import java.util.concurrent.TimeUnit._
-import java.util.Random
import org.infinispan.util.Util
import org.infinispan.{CacheException, Cache}
import org.infinispan.remoting.transport.Address
-import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager
+import java.util.{Properties, Random};
/**
* // TODO: Document this
@@ -34,8 +34,8 @@
override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
- override def start(host: String, port: Int, cacheManager: EmbeddedCacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int) {
- super.start(host, port, cacheManager, masterThreads, workerThreads, idleTimeout)
+ override def start(properties: Properties, cacheManager: EmbeddedCacheManager) {
+ super.start(properties, cacheManager)
// Start defined caches to avoid issues with lazily started caches
for (cacheName <- asIterator(cacheManager.getCacheNames.iterator))
cacheManager.getCache(cacheName)
@@ -43,7 +43,7 @@
isClustered = cacheManager.getGlobalConfiguration.getTransportClass != null
// If clustered, set up a cache for topology information
if (isClustered)
- addSelfToTopologyView(host, port, cacheManager)
+ addSelfToTopologyView(getHost, getPort, cacheManager)
}
private def addSelfToTopologyView(host: String, port: Int, cacheManager: EmbeddedCacheManager) {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala 2010-06-02 16:21:05 UTC (rev 1886)
@@ -3,7 +3,6 @@
import java.util.concurrent.atomic.AtomicInteger
import java.lang.reflect.Method
import org.infinispan.server.core.Logging
-import java.util.Arrays
import org.infinispan.server.hotrod.OperationStatus._
import org.testng.Assert._
import org.infinispan.util.Util
@@ -11,6 +10,8 @@
import org.infinispan.config.Configuration.CacheMode
import org.infinispan.config.Configuration
import org.infinispan.manager.EmbeddedCacheManager
+import org.infinispan.server.core.Main._
+import java.util.{Properties, Arrays}
/**
* // TODO: Document this
@@ -36,10 +37,21 @@
cacheManager.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
}
}
- server.start(host, port, manager, 0, 0, idleTimeout)
+ server.start(getProperties(host, port, idleTimeout), manager)
server
}
+ private def getProperties(host: String, port: Int, idleTimeout: Int): Properties = {
+ val properties = new Properties
+ properties.setProperty(PROP_KEY_HOST, host)
+ properties.setProperty(PROP_KEY_PORT, port.toString)
+ properties.setProperty(PROP_KEY_MASTER_THREADS, "0")
+ properties.setProperty(PROP_KEY_WORKER_THREADS, "0")
+ properties.setProperty(PROP_KEY_IDLE_TIMEOUT, idleTimeout.toString)
+ properties.setProperty(PROP_KEY_TCP_NO_DELAY, "true")
+ properties
+ }
+
def startCrashingHotRodServer(manager: EmbeddedCacheManager, port: Int): HotRodServer = {
val server = new HotRodServer {
import HotRodServer._
@@ -52,7 +64,7 @@
// but has been evicted from JGroups cluster.
}
}
- server.start(host, port, manager, 0, 0, 0)
+ server.start(getProperties(host, port, 0), manager)
server
}
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala 2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala 2010-06-02 16:21:05 UTC (rev 1886)
@@ -2,12 +2,13 @@
import java.lang.reflect.Method
import net.spy.memcached.{DefaultConnectionFactory, MemcachedClient}
-import java.util.Arrays
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicInteger
import org.infinispan.server.core.transport.Decoder
import org.infinispan.server.memcached.{MemcachedDecoder, MemcachedValue, MemcachedServer}
import org.infinispan.manager.EmbeddedCacheManager
+import java.util.{Properties, Arrays}
+import org.infinispan.server.core.Main._
/**
* // TODO: Document this
@@ -37,10 +38,21 @@
def startMemcachedTextServer(cacheManager: EmbeddedCacheManager, port: Int): MemcachedServer = {
val server = new MemcachedServer
- server.start(host, port, cacheManager, 0, 0, 0)
+ server.start(getProperties(host, port), cacheManager)
server
}
+ private def getProperties(host: String, port: Int): Properties = {
+ val properties = new Properties
+ properties.setProperty(PROP_KEY_HOST, host)
+ properties.setProperty(PROP_KEY_PORT, port.toString)
+ properties.setProperty(PROP_KEY_MASTER_THREADS, "0")
+ properties.setProperty(PROP_KEY_WORKER_THREADS, "0")
+ properties.setProperty(PROP_KEY_IDLE_TIMEOUT, "0")
+ properties.setProperty(PROP_KEY_TCP_NO_DELAY, "true")
+ properties
+ }
+
def startMemcachedTextServer(cacheManager: EmbeddedCacheManager, cacheName: String): MemcachedServer = {
startMemcachedTextServer(cacheManager, UniquePortThreadLocal.get.intValue, cacheName)
}
@@ -52,7 +64,7 @@
override def startDefaultCache = getCacheManager.getCache(cacheName)
}
- server.start(host, port, cacheManager, 0, 0, 0)
+ server.start(getProperties(host, port), cacheManager)
server
}
More information about the infinispan-commits
mailing list