[infinispan-commits] Infinispan SVN: r1886 - in trunk: client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport and 8 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Jun 2 12:21:07 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-06-02 12:21:05 -0400 (Wed, 02 Jun 2010)
New Revision: 1886

Modified:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Log:
[ISPN-476] (Enable tcp no delay by default and make it configurable in servers) Implemented flag switch. Tidied up ProtocolServer interface to take a Properties object instead of individual parameters as it makes it more easily extensible.

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-06-02 16:21:05 UTC (rev 1886)
@@ -58,11 +58,14 @@
  *  servers according to this strategy. Defaults to {@link org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy}
  *  </li>
  *  <li>
- * force-return-value - weather ot not to implicitelly {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} for all calls.
+ * force-return-value - weather or not to implicitly {@link org.infinispan.client.hotrod.Flag#FORCE_RETURN_VALUE} for all calls.
  * Defaults to false.
  * </li>
+ *  <li>
+ * tcp-no-delay - TCP no delay flag switch. Defaults to true.
+ * </li>
  * <br/>
- * <i>bellow is connection pooling config</i>:
+ * <i>below is connection pooling config</i>:
  * <p/>
  *  <li>maxActive - controls the maximum number of connections per server that are allocated (checked out to client threads, or idle in
  * the pool) at one time. When non-positive, there is no limit to the number of connections per server. When maxActive

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-06-02 16:21:05 UTC (rev 1886)
@@ -30,4 +30,6 @@
    void updateHashFunction(LinkedHashMap<InetSocketAddress,Integer> servers2HashCode, int numKeyOwners, short hashFunctionVersion, int hashSpace);
 
    Transport getTransport(byte[] key);
+
+   boolean isTcpNoDelay();
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransport.java	2010-06-02 16:21:05 UTC (rev 1886)
@@ -40,6 +40,7 @@
 
       // Set up the event pipeline factory.
       bootstrap.setPipelineFactory(new HotRodClientPipelaneFactory(decoder));
+      bootstrap.setOption("tcpNoDelay", getTransportFactory().isTcpNoDelay());
 
       // Start the connection attempt.
       ChannelFuture future = bootstrap.connect(serverAddress);

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-06-02 16:21:05 UTC (rev 1886)
@@ -21,11 +21,14 @@
 
    private InetSocketAddress serverAddr;
    private Collection<InetSocketAddress> serverAddresses;
+   private boolean tcpNoDelay;
 
    @Override
    public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers, AtomicInteger topologyId) {
       this.serverAddresses = staticConfiguredServers;
       serverAddr = serverAddresses.iterator().next();
+      tcpNoDelay = Boolean.valueOf(props.getProperty("tcp-no-delay", "true"));
+      if (log.isDebugEnabled()) log.debug("TCP no delay flag value is: {0}", tcpNoDelay);
    }
 
    @Override
@@ -58,4 +61,9 @@
    public Transport getTransport(byte[] key) {
       return getTransport();
    }
+
+   @Override
+   public boolean isTcpNoDelay() {
+      return tcpNoDelay;
+   }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-06-02 16:21:05 UTC (rev 1886)
@@ -33,6 +33,7 @@
       try {
          SocketChannel socketChannel = SocketChannel.open(serverAddress);
          socket = socketChannel.socket();
+         socket.setTcpNoDelay(transportFactory.isTcpNoDelay());
       } catch (IOException e) {
          String message = "Could not connect to server: " + serverAddress;
          log.error(message, e);

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-06-02 16:21:05 UTC (rev 1886)
@@ -36,6 +36,7 @@
    private volatile RequestBalancingStrategy balancer;
    private volatile Collection<InetSocketAddress> servers;
    private volatile ConsistentHash consistentHash;
+   private volatile boolean tcpNoDelay;
    private final ConsistentHashFactory hashFactory = new ConsistentHashFactory();
 
    @Override
@@ -44,6 +45,8 @@
       servers = staticConfiguredServers;
       String balancerClass = props.getProperty("request-balancing-strategy", RoundRobinBalancingStrategy.class.getName());
       balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
+      tcpNoDelay = Boolean.valueOf(props.getProperty("tcp-no-delay", "true"));
+      if (log.isDebugEnabled()) log.debug("TCP no delay flag value is: {0}", tcpNoDelay);
       PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TransportObjectFactory(this, topologyId), props);
       connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
       balancer.setServers(servers);
@@ -174,4 +177,8 @@
    public ConsistentHash getConsistentHash() {
       return consistentHash;
    }
+
+   public boolean isTcpNoDelay() {
+      return tcpNoDelay;
+   }
 }

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java	2010-06-02 16:21:05 UTC (rev 1886)
@@ -9,6 +9,8 @@
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -56,8 +58,16 @@
 
       int port = hotrodServer.getPort();
       hotrodServer.stop();
-      hotrodServer.start("localhost", port, cacheManager, 2, 2, 20000);
 
+      Properties properties = new Properties();
+      properties.setProperty("infinispan.server.host", "localhost");
+      properties.setProperty("infinispan.server.port", Integer.toString(port));
+      properties.setProperty("infinispan.server.master.threads", "2");
+      properties.setProperty("infinispan.server.worker.threads", "2");
+      properties.setProperty("infinispan.server.idle.timeout", "20000");
+      properties.setProperty("infinispan.server.tcp.no.delay", "true");
+      hotrodServer.start(properties, cacheManager);
+
       Thread.sleep(3000);
 
       assert defaultRemote.get("k").equals("v");

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-06-02 16:21:05 UTC (rev 1886)
@@ -4,7 +4,9 @@
 import transport.netty.{EncoderAdapter, NettyTransport}
 import transport.Transport
 import org.infinispan.server.core.VersionGenerator._
-import org.infinispan.manager.{EmbeddedCacheManager, CacheManager}
+import org.infinispan.manager.EmbeddedCacheManager
+import org.infinispan.server.core.Main._
+import java.util.Properties
 
 /**
  * // TODO: Document this
@@ -19,12 +21,14 @@
    private var transport: Transport = _
    private var cacheManager: EmbeddedCacheManager = _
 
-   override def start(host: String, port: Int, cacheManager: EmbeddedCacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int) {
-      this.host = host
-      this.port = port
-      this.masterThreads = masterThreads
-      this.workerThreads = workerThreads
+   override def start(properties: Properties, cacheManager: EmbeddedCacheManager) {
+      this.host = properties.getProperty(PROP_KEY_HOST)
+      this.port = properties.getProperty(PROP_KEY_PORT).toInt
+      this.masterThreads = properties.getProperty(PROP_KEY_MASTER_THREADS).toInt
+      this.workerThreads = properties.getProperty(PROP_KEY_WORKER_THREADS).toInt
       this.cacheManager = cacheManager
+      val idleTimeout = properties.getProperty(PROP_KEY_IDLE_TIMEOUT).toInt
+      val tcpNoDelay = properties.getProperty(PROP_KEY_TCP_NO_DELAY).toBoolean
 
       // Register rank calculator before starting any cache so that we can capture all view changes
       cacheManager.addListener(getRankCalculatorListener)
@@ -33,7 +37,8 @@
       val address =  new InetSocketAddress(host, port)
       val encoder = getEncoder
       val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
-      transport = new NettyTransport(this, nettyEncoder, address, masterThreads, workerThreads, idleTimeout, threadNamePrefix)
+      transport = new NettyTransport(this, nettyEncoder, address, masterThreads, workerThreads, idleTimeout,
+         threadNamePrefix, tcpNoDelay)
       transport.start
    }
 

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala	2010-06-02 16:21:05 UTC (rev 1886)
@@ -1,8 +1,8 @@
 package org.infinispan.server.core
 
 import scala.collection.JavaConversions._
-import collection.mutable.HashMap
 import scala.collection.mutable
+import scala.collection.immutable
 import org.infinispan.util.Util
 import java.io.IOException
 import java.security.{PrivilegedAction, AccessController}
@@ -10,6 +10,7 @@
 import gnu.getopt.{Getopt, LongOpt}
 import org.infinispan.Version
 import org.infinispan.manager.{CacheManager, DefaultCacheManager}
+import java.util.Properties
 
 /**
  * Main class for server startup.
@@ -26,23 +27,27 @@
    val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache.config"
    val PROP_KEY_PROTOCOL = "infinispan.server.protocol"
    val PROP_KEY_IDLE_TIMEOUT = "infinispan.server.idle.timeout"
+   val PROP_KEY_TCP_NO_DELAY = "infinispan.server.tcp.no.delay"
    val PORT_DEFAULT = 11211
    val HOST_DEFAULT = "127.0.0.1"
-   val MASTER_THREADS_DEFAULT = 0
-   val WORKER_THREADS_DEFAULT = 0
-   val IDLE_TIMEOUT_DEFAULT = -1
+   val MASTER_THREADS_DEFAULT = "0"
+   val WORKER_THREADS_DEFAULT = "0"
+   val IDLE_TIMEOUT_DEFAULT = "-1"
+   val TCP_NO_DELAY_DEFAULT = "true"
 
    /**
     * Server properties.  This object holds all of the required
     * information to get the server up and running. Use System
     * properties for defaults.
     */
-   private val props: mutable.Map[String, String] = {
+   private val props: Properties = {
       // Set default properties
-      val properties = new HashMap[String, String]
-      val sysProps = System.getProperties
-      for (property <- asIterator(sysProps.iterator))
-         properties.put(property._1, property._2)
+      val properties = new Properties(System.getProperties)
+      properties.setProperty(PROP_KEY_HOST, HOST_DEFAULT)
+      properties.setProperty(PROP_KEY_MASTER_THREADS, MASTER_THREADS_DEFAULT)
+      properties.setProperty(PROP_KEY_WORKER_THREADS, WORKER_THREADS_DEFAULT)
+      properties.setProperty(PROP_KEY_IDLE_TIMEOUT, IDLE_TIMEOUT_DEFAULT)
+      properties.setProperty(PROP_KEY_TCP_NO_DELAY, TCP_NO_DELAY_DEFAULT)
       properties
    }
    
@@ -79,52 +84,65 @@
       // First process the command line to pickup custom props/settings
       processCommandLine(args)
 
-      val host = if (props.get(PROP_KEY_HOST) == None) HOST_DEFAULT else props.get(PROP_KEY_HOST).get
-      val masterThreads = if (props.get(PROP_KEY_MASTER_THREADS) == None) MASTER_THREADS_DEFAULT else props.get(PROP_KEY_MASTER_THREADS).get.toInt
+      val properties = new Properties
+
+      val masterThreads = props.getProperty(PROP_KEY_MASTER_THREADS).toInt
       if (masterThreads < 0)
          throw new IllegalArgumentException("Master threads can't be lower than 0: " + masterThreads)
-
-      val workerThreads = if (props.get(PROP_KEY_WORKER_THREADS) == None) WORKER_THREADS_DEFAULT else props.get(PROP_KEY_WORKER_THREADS).get.toInt
+      
+      val workerThreads = props.getProperty(PROP_KEY_WORKER_THREADS).toInt
       if (workerThreads < 0)
          throw new IllegalArgumentException("Worker threads can't be lower than 0: " + masterThreads)
 
-      val configFile = props.get(PROP_KEY_CACHE_CONFIG)
-      var protocol = props.get(PROP_KEY_PROTOCOL)
-      if (protocol == None) {
+      var protocol = props.getProperty(PROP_KEY_PROTOCOL)
+      if (protocol == null) {
          System.err.println("ERROR: Please indicate protocol to run with -r parameter")
          showAndExit
       }
-      val idleTimeout = if (props.get(PROP_KEY_IDLE_TIMEOUT) == None) IDLE_TIMEOUT_DEFAULT else props.get(PROP_KEY_IDLE_TIMEOUT).get.toInt
+      
+      val idleTimeout = props.getProperty(PROP_KEY_IDLE_TIMEOUT).toInt
       if (idleTimeout < -1)
          throw new IllegalArgumentException("Idle timeout can't be lower than -1: " + idleTimeout)
 
+      val tcpNoDelay = props.getProperty(PROP_KEY_TCP_NO_DELAY)
+      try {
+         tcpNoDelay.toBoolean
+      } catch {
+         case n: NumberFormatException => {
+            throw new IllegalArgumentException("TCP no delay flag switch must be a boolean: " + tcpNoDelay)
+         }
+      }
+
       // TODO: move class name and protocol number to a resource file under the corresponding project
-      val clazz = protocol.get match {
+      val clazz = protocol match {
          case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
          case "hotrod" => "org.infinispan.server.hotrod.HotRodServer"
          case "websocket" => "org.infinispan.server.websocket.WebSocketServer"
       }
+      val server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]
+
       val port = {
-         if (props.get(PROP_KEY_PORT) == None) {
-            protocol.get match {
+         if (props.getProperty(PROP_KEY_PORT) == null) {
+            protocol match {
                case "memcached" => 11211
                case "hotrod" => 11311
                case "websocket" => 8181
             }
          } else {
-            props.get(PROP_KEY_PORT).get.toInt
+            props.getProperty(PROP_KEY_PORT).toInt
          }
       }
+      props.setProperty(PROP_KEY_PORT, port.toString)
 
-      var server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]
-      val cacheManager = if (configFile == None) new DefaultCacheManager else new DefaultCacheManager(configFile.get)
+      val configFile = props.getProperty(PROP_KEY_CACHE_CONFIG)
+      val cacheManager = if (configFile == null) new DefaultCacheManager else new DefaultCacheManager(configFile)
       addShutdownHook(new ShutdownHook(server, cacheManager))
-      server.start(host, port, cacheManager, masterThreads, workerThreads, idleTimeout)
+      server.start(props, cacheManager)
    }
 
    private def processCommandLine(args: Array[String]) {
       programName = System.getProperty("program.name", "startServer")
-      var sopts = "-:hD:Vp:l:m:t:c:r:i:"
+      var sopts = "-:hD:Vp:l:m:t:c:r:i:n:"
       var lopts = Array(
          new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
          new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
@@ -134,7 +152,8 @@
          new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
          new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),
          new LongOpt("protocol", LongOpt.REQUIRED_ARGUMENT, null, 'r'),
-         new LongOpt("idle_timeout", LongOpt.REQUIRED_ARGUMENT, null, 'i'))
+         new LongOpt("idle_timeout", LongOpt.REQUIRED_ARGUMENT, null, 'i'),
+         new LongOpt("tcp_no_delay", LongOpt.REQUIRED_ARGUMENT, null, 'n'))
       var getopt = new Getopt(programName, args, sopts, lopts)
       var code: Int = 0
       while ((({code = getopt.getopt; code})) != -1) {
@@ -146,13 +165,14 @@
                Version.printFullVersionInformation
                System.exit(0)
             }
-            case 'p' => props.put(PROP_KEY_PORT, getopt.getOptarg)
-            case 'l' => props.put(PROP_KEY_HOST, getopt.getOptarg)
-            case 'm' => props.put(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
-            case 't' => props.put(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
-            case 'c' => props.put(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
-            case 'r' => props.put(PROP_KEY_PROTOCOL, getopt.getOptarg)
-            case 'i' => props.put(PROP_KEY_IDLE_TIMEOUT, getopt.getOptarg)
+            case 'p' => props.setProperty(PROP_KEY_PORT, getopt.getOptarg)
+            case 'l' => props.setProperty(PROP_KEY_HOST, getopt.getOptarg)
+            case 'm' => props.setProperty(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
+            case 't' => props.setProperty(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
+            case 'c' => props.setProperty(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
+            case 'r' => props.setProperty(PROP_KEY_PROTOCOL, getopt.getOptarg)
+            case 'i' => props.setProperty(PROP_KEY_IDLE_TIMEOUT, getopt.getOptarg)
+            case 'n' => props.setProperty(PROP_KEY_TCP_NO_DELAY, getopt.getOptarg)
             case 'D' => {
                val arg = getopt.getOptarg
                var name = ""
@@ -208,6 +228,8 @@
       println("                                       If no new messages have been read within this time, the server disconnects the channel.")
       println("                                       Passing -1 disables idle timeout.")
       println
+      println("    -n, --tcp_no_delay=[true|false]   TCP no delay flag switch (default: true).")
+      println
       println("    -D<name>[=<value>]                 Set a system property")
       println
       System.exit(0)

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala	2010-06-02 16:21:05 UTC (rev 1886)
@@ -2,6 +2,7 @@
 
 import transport.{Decoder, Encoder}
 import org.infinispan.manager.{EmbeddedCacheManager}
+import java.util.Properties
 
 /**
  * // TODO: Document this
@@ -9,8 +10,15 @@
  * @since 4.1
  */
 trait ProtocolServer {
-   def start(host: String, port: Int, cacheManager: EmbeddedCacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int)
+
+   /**
+    * Using Properties here instead of a Map in order to make it easier for java code to call in.
+    */
+   def start(properties: Properties, cacheManager: EmbeddedCacheManager)
+
    def stop
+
    def getEncoder: Encoder
+
    def getDecoder: Decoder
 }

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-06-02 16:21:05 UTC (rev 1886)
@@ -18,7 +18,7 @@
  */
 class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
                      address: SocketAddress, masterThreads: Int, workerThreads: Int,
-                     idleTimeout: Int, threadNamePrefix: String) extends Transport {
+                     idleTimeout: Int, threadNamePrefix: String, tcpNoDelay: Boolean) extends Transport {
    import NettyTransport._
 
    private val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
@@ -70,6 +70,7 @@
       })
       val bootstrap = new ServerBootstrap(factory);
       bootstrap.setPipelineFactory(pipeline);
+      bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
       val ch = bootstrap.bind(address);
       serverChannels.add(ch);
    }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-06-02 16:21:05 UTC (rev 1886)
@@ -9,11 +9,11 @@
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent
 import scala.collection.JavaConversions._
 import java.util.concurrent.TimeUnit._
-import java.util.Random
 import org.infinispan.util.Util
 import org.infinispan.{CacheException, Cache}
 import org.infinispan.remoting.transport.Address
-import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager
+import java.util.{Properties, Random};
 
 /**
  * // TODO: Document this
@@ -34,8 +34,8 @@
 
    override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
 
-   override def start(host: String, port: Int, cacheManager: EmbeddedCacheManager, masterThreads: Int, workerThreads: Int, idleTimeout: Int) {
-      super.start(host, port, cacheManager, masterThreads, workerThreads, idleTimeout)
+   override def start(properties: Properties, cacheManager: EmbeddedCacheManager) {
+      super.start(properties, cacheManager)
       // Start defined caches to avoid issues with lazily started caches
       for (cacheName <- asIterator(cacheManager.getCacheNames.iterator))
          cacheManager.getCache(cacheName)
@@ -43,7 +43,7 @@
       isClustered = cacheManager.getGlobalConfiguration.getTransportClass != null
       // If clustered, set up a cache for topology information
       if (isClustered)
-         addSelfToTopologyView(host, port, cacheManager)
+         addSelfToTopologyView(getHost, getPort, cacheManager)
    }
 
    private def addSelfToTopologyView(host: String, port: Int, cacheManager: EmbeddedCacheManager) {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-06-02 16:21:05 UTC (rev 1886)
@@ -3,7 +3,6 @@
 import java.util.concurrent.atomic.AtomicInteger
 import java.lang.reflect.Method
 import org.infinispan.server.core.Logging
-import java.util.Arrays
 import org.infinispan.server.hotrod.OperationStatus._
 import org.testng.Assert._
 import org.infinispan.util.Util
@@ -11,6 +10,8 @@
 import org.infinispan.config.Configuration.CacheMode
 import org.infinispan.config.Configuration
 import org.infinispan.manager.EmbeddedCacheManager
+import org.infinispan.server.core.Main._
+import java.util.{Properties, Arrays}
 
 /**
  * // TODO: Document this
@@ -36,10 +37,21 @@
             cacheManager.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
          }
       }
-      server.start(host, port, manager, 0, 0, idleTimeout)
+      server.start(getProperties(host, port, idleTimeout), manager)
       server
    }
 
+   private def getProperties(host: String, port: Int, idleTimeout: Int): Properties = {
+      val properties = new Properties
+      properties.setProperty(PROP_KEY_HOST, host)
+      properties.setProperty(PROP_KEY_PORT, port.toString)
+      properties.setProperty(PROP_KEY_MASTER_THREADS, "0")
+      properties.setProperty(PROP_KEY_WORKER_THREADS, "0")
+      properties.setProperty(PROP_KEY_IDLE_TIMEOUT, idleTimeout.toString)
+      properties.setProperty(PROP_KEY_TCP_NO_DELAY, "true")
+      properties
+   }
+
    def startCrashingHotRodServer(manager: EmbeddedCacheManager, port: Int): HotRodServer = {
       val server = new HotRodServer {
          import HotRodServer._
@@ -52,7 +64,7 @@
             // but has been evicted from JGroups cluster.
          }
       }
-      server.start(host, port, manager, 0, 0, 0)
+      server.start(getProperties(host, port, 0), manager)
       server
    }
 

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-06-02 10:06:23 UTC (rev 1885)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-06-02 16:21:05 UTC (rev 1886)
@@ -2,12 +2,13 @@
 
 import java.lang.reflect.Method
 import net.spy.memcached.{DefaultConnectionFactory, MemcachedClient}
-import java.util.Arrays
 import java.net.InetSocketAddress
 import java.util.concurrent.atomic.AtomicInteger
 import org.infinispan.server.core.transport.Decoder
 import org.infinispan.server.memcached.{MemcachedDecoder, MemcachedValue, MemcachedServer}
 import org.infinispan.manager.EmbeddedCacheManager
+import java.util.{Properties, Arrays}
+import org.infinispan.server.core.Main._
 
 /**
  * // TODO: Document this
@@ -37,10 +38,21 @@
 
    def startMemcachedTextServer(cacheManager: EmbeddedCacheManager, port: Int): MemcachedServer = {
       val server = new MemcachedServer
-      server.start(host, port, cacheManager, 0, 0, 0)
+      server.start(getProperties(host, port), cacheManager)
       server
    }
 
+   private def getProperties(host: String, port: Int): Properties = {
+      val properties = new Properties
+      properties.setProperty(PROP_KEY_HOST, host)
+      properties.setProperty(PROP_KEY_PORT, port.toString)
+      properties.setProperty(PROP_KEY_MASTER_THREADS, "0")
+      properties.setProperty(PROP_KEY_WORKER_THREADS, "0")
+      properties.setProperty(PROP_KEY_IDLE_TIMEOUT, "0")
+      properties.setProperty(PROP_KEY_TCP_NO_DELAY, "true")
+      properties
+   }
+
    def startMemcachedTextServer(cacheManager: EmbeddedCacheManager, cacheName: String): MemcachedServer = {
       startMemcachedTextServer(cacheManager, UniquePortThreadLocal.get.intValue, cacheName)
    }
@@ -52,7 +64,7 @@
 
          override def startDefaultCache = getCacheManager.getCache(cacheName)
       }
-      server.start(host, port, cacheManager, 0, 0, 0)
+      server.start(getProperties(host, port), cacheManager)
       server
    }
    



More information about the infinispan-commits mailing list