[infinispan-commits] Infinispan SVN: r1898 - in trunk: server/core/src/main/scala/org/infinispan/server/core and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jun 10 11:31:41 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-06-10 11:31:40 -0400 (Thu, 10 Jun 2010)
New Revision: 1898

Modified:
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
   trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java
Log:
[ISPN-483] (Send/receive buffer sizes configurable) Done.

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java	2010-06-10 15:25:05 UTC (rev 1897)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerRestartTest.java	2010-06-10 15:31:40 UTC (rev 1898)
@@ -62,10 +62,12 @@
       Properties properties = new Properties();
       properties.setProperty("infinispan.server.host", "localhost");
       properties.setProperty("infinispan.server.port", Integer.toString(port));
-      properties.setProperty("infinispan.server.master.threads", "2");
-      properties.setProperty("infinispan.server.worker.threads", "2");
-      properties.setProperty("infinispan.server.idle.timeout", "20000");
-      properties.setProperty("infinispan.server.tcp.no.delay", "true");
+      properties.setProperty("infinispan.server.master_threads", "2");
+      properties.setProperty("infinispan.server.worker_threads", "2");
+      properties.setProperty("infinispan.server.idle_timeout", "20000");
+      properties.setProperty("infinispan.server.tcp_no_delay", "true");
+      properties.setProperty("infinispan.server.send_buf_size", "15000");
+      properties.setProperty("infinispan.server.recv_buf_size", "25000");
       hotrodServer.start(properties, cacheManager);
 
       Thread.sleep(3000);

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-06-10 15:25:05 UTC (rev 1897)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-06-10 15:31:40 UTC (rev 1898)
@@ -24,11 +24,13 @@
    override def start(properties: Properties, cacheManager: EmbeddedCacheManager) {
       this.host = properties.getProperty(PROP_KEY_HOST)
       this.port = properties.getProperty(PROP_KEY_PORT).toInt
-      this.masterThreads = properties.getProperty(PROP_KEY_MASTER_THREADS).toInt
-      this.workerThreads = properties.getProperty(PROP_KEY_WORKER_THREADS).toInt
+      this.masterThreads = properties.getProperty(PROP_KEY_MASTER_THREADS, MASTER_THREADS_DEFAULT).toInt
+      this.workerThreads = properties.getProperty(PROP_KEY_WORKER_THREADS, WORKER_THREADS_DEFAULT).toInt
       this.cacheManager = cacheManager
-      val idleTimeout = properties.getProperty(PROP_KEY_IDLE_TIMEOUT).toInt
-      val tcpNoDelay = properties.getProperty(PROP_KEY_TCP_NO_DELAY).toBoolean
+      val idleTimeout = properties.getProperty(PROP_KEY_IDLE_TIMEOUT, IDLE_TIMEOUT_DEFAULT).toInt
+      val tcpNoDelay = properties.getProperty(PROP_KEY_TCP_NO_DELAY, TCP_NO_DELAY_DEFAULT).toBoolean
+      val sendBufSize = properties.getProperty(PROP_KEY_SEND_BUF_SIZE, SEND_BUF_SIZE_DEFAULT).toInt
+      val recvBufSize = properties.getProperty(PROP_KEY_RECV_BUF_SIZE, RECV_BUF_SIZE_DEFAULT).toInt
 
       // Register rank calculator before starting any cache so that we can capture all view changes
       cacheManager.addListener(getRankCalculatorListener)
@@ -38,7 +40,7 @@
       val encoder = getEncoder
       val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
       transport = new NettyTransport(this, nettyEncoder, address, masterThreads, workerThreads, idleTimeout,
-         threadNamePrefix, tcpNoDelay)
+         threadNamePrefix, tcpNoDelay, sendBufSize, recvBufSize)
       transport.start
    }
 

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala	2010-06-10 15:25:05 UTC (rev 1897)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala	2010-06-10 15:31:40 UTC (rev 1898)
@@ -22,18 +22,22 @@
    
    val PROP_KEY_PORT = "infinispan.server.port"
    val PROP_KEY_HOST = "infinispan.server.host"
-   val PROP_KEY_MASTER_THREADS = "infinispan.server.master.threads"
-   val PROP_KEY_WORKER_THREADS = "infinispan.server.worker.threads"
-   val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache.config"
+   val PROP_KEY_MASTER_THREADS = "infinispan.server.master_threads"
+   val PROP_KEY_WORKER_THREADS = "infinispan.server.worker_threads"
+   val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache_config"
    val PROP_KEY_PROTOCOL = "infinispan.server.protocol"
-   val PROP_KEY_IDLE_TIMEOUT = "infinispan.server.idle.timeout"
-   val PROP_KEY_TCP_NO_DELAY = "infinispan.server.tcp.no.delay"
+   val PROP_KEY_IDLE_TIMEOUT = "infinispan.server.idle_timeout"
+   val PROP_KEY_TCP_NO_DELAY = "infinispan.server.tcp_no_delay"
+   val PROP_KEY_SEND_BUF_SIZE = "infinispan.server.send_buf_size"
+   val PROP_KEY_RECV_BUF_SIZE = "infinispan.server.recv_buf_size"
    val PORT_DEFAULT = 11211
    val HOST_DEFAULT = "127.0.0.1"
    val MASTER_THREADS_DEFAULT = "0"
    val WORKER_THREADS_DEFAULT = "0"
    val IDLE_TIMEOUT_DEFAULT = "-1"
    val TCP_NO_DELAY_DEFAULT = "true"
+   val SEND_BUF_SIZE_DEFAULT = "0"
+   val RECV_BUF_SIZE_DEFAULT = "0"
 
    /**
     * Server properties.  This object holds all of the required
@@ -48,6 +52,8 @@
       properties.setProperty(PROP_KEY_WORKER_THREADS, WORKER_THREADS_DEFAULT)
       properties.setProperty(PROP_KEY_IDLE_TIMEOUT, IDLE_TIMEOUT_DEFAULT)
       properties.setProperty(PROP_KEY_TCP_NO_DELAY, TCP_NO_DELAY_DEFAULT)
+      properties.setProperty(PROP_KEY_SEND_BUF_SIZE, SEND_BUF_SIZE_DEFAULT)
+      properties.setProperty(PROP_KEY_RECV_BUF_SIZE, RECV_BUF_SIZE_DEFAULT)
       properties
    }
    
@@ -113,6 +119,16 @@
          }
       }
 
+      val sendBufSize = props.getProperty(PROP_KEY_SEND_BUF_SIZE).toInt
+      if (sendBufSize < 0) {
+         throw new IllegalArgumentException("Send buffer size can't be lower than 0: " + sendBufSize)
+      }
+
+      val recvBufSize = props.getProperty(PROP_KEY_SEND_BUF_SIZE).toInt
+      if (recvBufSize < 0) {
+         throw new IllegalArgumentException("Send buffer size can't be lower than 0: " + sendBufSize)
+      }
+
       // TODO: move class name and protocol number to a resource file under the corresponding project
       val clazz = protocol match {
          case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
@@ -142,7 +158,7 @@
 
    private def processCommandLine(args: Array[String]) {
       programName = System.getProperty("program.name", "startServer")
-      var sopts = "-:hD:Vp:l:m:t:c:r:i:n:"
+      var sopts = "-:hD:Vp:l:m:t:c:r:i:n:s:e:"
       var lopts = Array(
          new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
          new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
@@ -153,7 +169,10 @@
          new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),
          new LongOpt("protocol", LongOpt.REQUIRED_ARGUMENT, null, 'r'),
          new LongOpt("idle_timeout", LongOpt.REQUIRED_ARGUMENT, null, 'i'),
-         new LongOpt("tcp_no_delay", LongOpt.REQUIRED_ARGUMENT, null, 'n'))
+         new LongOpt("tcp_no_delay", LongOpt.REQUIRED_ARGUMENT, null, 'n'),
+         new LongOpt("send_buf_size", LongOpt.REQUIRED_ARGUMENT, null, 's'),
+         new LongOpt("recv_buf_size", LongOpt.REQUIRED_ARGUMENT, null, 'e')
+         )
       var getopt = new Getopt(programName, args, sopts, lopts)
       var code: Int = 0
       while ((({code = getopt.getopt; code})) != -1) {
@@ -228,8 +247,12 @@
       println("                                       If no new messages have been read within this time, the server disconnects the channel.")
       println("                                       Passing -1 disables idle timeout.")
       println
-      println("    -n, --tcp_no_delay=[true|false]   TCP no delay flag switch (default: true).")
+      println("    -n, --tcp_no_delay=[true|false]    TCP no delay flag switch (default: true).")
       println
+      println("    -s, --send_buf_size=<num>          Send buffer size (default: as defined by the OS).")
+      println
+      println("    -e, --recv_buf_size=<>             Receive buffer size (default: as defined by the OS).")
+      println
       println("    -D<name>[=<value>]                 Set a system property")
       println
       System.exit(0)

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-06-10 15:25:05 UTC (rev 1897)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-06-10 15:31:40 UTC (rev 1898)
@@ -18,7 +18,8 @@
  */
 class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
                      address: SocketAddress, masterThreads: Int, workerThreads: Int,
-                     idleTimeout: Int, threadNamePrefix: String, tcpNoDelay: Boolean) extends Transport {
+                     idleTimeout: Int, threadNamePrefix: String, tcpNoDelay: Boolean,
+                     sendBufSize: Int, recvBufSize: Int) extends Transport {
    import NettyTransport._
 
    private val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
@@ -68,11 +69,16 @@
             name
          }
       })
-      val bootstrap = new ServerBootstrap(factory);
-      bootstrap.setPipelineFactory(pipeline);
-      bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
-      val ch = bootstrap.bind(address);
-      serverChannels.add(ch);
+      val bootstrap = new ServerBootstrap(factory)
+      bootstrap.setPipelineFactory(pipeline)
+      bootstrap.setOption("child.tcpNoDelay", tcpNoDelay)
+      if (sendBufSize > 0)
+         bootstrap.setOption("child.sendBufferSize", sendBufSize)
+      if (recvBufSize > 0)
+         bootstrap.setOption("receiveBufferSize", recvBufSize)
+
+      val ch = bootstrap.bind(address)
+      serverChannels.add(ch)
    }
 
    override def stop {
@@ -105,4 +111,4 @@
 
 }
 
-object NettyTransport extends Logging
\ No newline at end of file
+object NettyTransport extends Logging

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-06-10 15:25:05 UTC (rev 1897)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-06-10 15:31:40 UTC (rev 1898)
@@ -45,10 +45,7 @@
       val properties = new Properties
       properties.setProperty(PROP_KEY_HOST, host)
       properties.setProperty(PROP_KEY_PORT, port.toString)
-      properties.setProperty(PROP_KEY_MASTER_THREADS, "0")
-      properties.setProperty(PROP_KEY_WORKER_THREADS, "0")
       properties.setProperty(PROP_KEY_IDLE_TIMEOUT, idleTimeout.toString)
-      properties.setProperty(PROP_KEY_TCP_NO_DELAY, "true")
       properties
    }
 

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-06-10 15:25:05 UTC (rev 1897)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-06-10 15:31:40 UTC (rev 1898)
@@ -46,10 +46,6 @@
       val properties = new Properties
       properties.setProperty(PROP_KEY_HOST, host)
       properties.setProperty(PROP_KEY_PORT, port.toString)
-      properties.setProperty(PROP_KEY_MASTER_THREADS, "0")
-      properties.setProperty(PROP_KEY_WORKER_THREADS, "0")
-      properties.setProperty(PROP_KEY_IDLE_TIMEOUT, "0")
-      properties.setProperty(PROP_KEY_TCP_NO_DELAY, "true")
       properties
    }
 
@@ -73,4 +69,4 @@
 object UniquePortThreadLocal extends ThreadLocal[Int] {
    private val uniqueAddr = new AtomicInteger(11211)
    override def initialValue: Int = uniqueAddr.getAndAdd(100)
-}
\ No newline at end of file
+}

Modified: trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java
===================================================================
--- trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java	2010-06-10 15:25:05 UTC (rev 1897)
+++ trunk/server/websocket/src/main/java/org/infinispan/server/websocket/WebSocketServer.java	2010-06-10 15:31:40 UTC (rev 1898)
@@ -81,8 +81,8 @@
    public void start(Properties properties, EmbeddedCacheManager cacheManager) {
       String host = properties.getProperty("infinispan.server.host");
       int port = Integer.parseInt(properties.getProperty("infinispan.server.port"));
-      int masterThreads = Integer.parseInt(properties.getProperty("infinispan.server.master.threads"));
-      int workerThreads = Integer.parseInt(properties.getProperty("infinispan.server.worker.threads"));
+      int masterThreads = Integer.parseInt(properties.getProperty("infinispan.server.master_threads"));
+      int workerThreads = Integer.parseInt(properties.getProperty("infinispan.server.worker_threads"));
 
       InetSocketAddress address = new InetSocketAddress(host, port);
 



More information about the infinispan-commits mailing list