[infinispan-issues] [JBoss JIRA] Updated: (ISPN-955) Allow HotRod server to be passed a cache manager class to be used instead of DefaultCacheManager

Octavian Florescu (JIRA) jira-events at lists.jboss.org
Fri Feb 25 18:12:05 EST 2011


     [ https://issues.jboss.org/browse/ISPN-955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Octavian Florescu updated ISPN-955:
-----------------------------------

    Attachment: Main.scala


> Allow HotRod server to be passed a cache manager class to be used instead of DefaultCacheManager
> ------------------------------------------------------------------------------------------------
>
>                 Key: ISPN-955
>                 URL: https://issues.jboss.org/browse/ISPN-955
>             Project: Infinispan
>          Issue Type: Enhancement
>          Components: Cache Server
>    Affects Versions: 5.0.0.ALPHA3
>         Environment: 4.2.1.CR1
>            Reporter: Octavian Florescu
>            Assignee: Manik Surtani
>         Attachments: Main.scala
>
>
> The cache manager class used by HotRod server is not pluggable. org.infinispan.server.core.Main always uses DefaultCacheManager.
> Allow passing in the name of a user supplied class implementing EmbeddedCacheManager thought some mean (eg a param).
> I implemented this functionality already (see bellow, look for the addition of -f param), so I hope something like this makes it into 5.0.0
> package org.infinispan.server.core
> import java.io.IOException
> import java.security.{PrivilegedAction, AccessController}
> import java.util.concurrent.{ThreadFactory, ExecutionException, Callable, Executors}
> import gnu.getopt.{Getopt, LongOpt}
> import org.infinispan.Version
> import java.util.Properties
> import org.infinispan.util.{TypedProperties, Util}
> import org.infinispan.config.{Configuration, GlobalConfiguration}
> import org.infinispan.manager.{EmbeddedCacheManager, CacheContainer, DefaultCacheManager}
> import org.infinispan.config.GlobalConfiguration.ShutdownHookBehavior
> /**
>  * Main class for server startup.
>  *
>  * @author Galder Zamarreño
>  * @since 4.1
>  */
> object Main extends Logging {
>    
>    val PROP_KEY_PORT = "infinispan.server.port"
>    val PROP_KEY_HOST = "infinispan.server.host"
>    val PROP_KEY_MASTER_THREADS = "infinispan.server.master_threads"
>    val PROP_KEY_WORKER_THREADS = "infinispan.server.worker_threads"
>    val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache_config"
>    val PROP_KEY_PROTOCOL = "infinispan.server.protocol"
>    val PROP_KEY_IDLE_TIMEOUT = "infinispan.server.idle_timeout"
>    val PROP_KEY_TCP_NO_DELAY = "infinispan.server.tcp_no_delay"
>    val PROP_KEY_SEND_BUF_SIZE = "infinispan.server.send_buf_size"
>    val PROP_KEY_RECV_BUF_SIZE = "infinispan.server.recv_buf_size"
>    val PROP_KEY_PROXY_HOST = "infinispan.server.proxy_host"
>    val PROP_KEY_PROXY_PORT = "infinispan.server.proxy_port"
>    val PROP_KEY_CACHE_MANAGER_CLASS = "infinispan.server.cache_manager_class"	   
>    val HOST_DEFAULT = "127.0.0.1"
>    val MASTER_THREADS_DEFAULT = 0
>    val WORKER_THREADS_DEFAULT = 0
>    val IDLE_TIMEOUT_DEFAULT = -1
>    val TCP_NO_DELAY_DEFAULT = true
>    val SEND_BUF_SIZE_DEFAULT = 0
>    val RECV_BUF_SIZE_DEFAULT = 0
>    /**
>     * Server properties.  This object holds all of the required
>     * information to get the server up and running. Use System
>     * properties for defaults.
>     */
>    private val props: Properties = new TypedProperties(System.getProperties)
>    
>    private var server: ProtocolServer = _
>    private var cacheManager: EmbeddedCacheManager = _
>    def getServer = server
>    def getCacheManager = cacheManager
>    def main(args: Array[String]) {
>       info("Start main with args: {0}", args.mkString(", "))
>       val worker = new Callable[Void] {
>          override def call = {
>             try {
>                boot(args)
>             }
>             catch {
>                case e: Exception => {
>                   System.err.println("Failed to boot JBoss:")
>                   e.printStackTrace
>                   throw e
>                }
>             }
>             null
>          }
>       }
>       val f = Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
>          override def newThread(r: Runnable): Thread = {
>             // TODO Maybe create thread names based on the protocol run
>             return new Thread(r, "InfinispanServer-Main")
>          }
>       }).submit(worker)
>       f.get
>    }
>    def boot(args: Array[String]) {
>       // First process the command line to pickup custom props/settings
>       processCommandLine(args)
>       var protocol = props.getProperty(PROP_KEY_PROTOCOL)
>       if (protocol == null) {
>          System.err.println("ERROR: Please indicate protocol to run with -r parameter")
>          showAndExit
>       }
>       // TODO: move class name and protocol number to a resource file under the corresponding project
>       val clazz = protocol match {
>          case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
>          case "hotrod" => "org.infinispan.server.hotrod.HotRodServer"
>          case "websocket" => "org.infinispan.server.websocket.WebSocketServer"
>       }
>       server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]
>       val configFile = props.getProperty(PROP_KEY_CACHE_CONFIG)
>       cacheManager = instantiateCacheManager(configFile)
>       // Servers need a shutdown hook to close down network layer, so there's no need for an extra shutdown hook.
>       // Removing Infinispan's hook also makes shutdown procedures for server and cache manager sequential, avoiding
>       // issues with having the JGroups channel disconnected before it's removed itself from the topology view.
>       cacheManager.getGlobalConfiguration.setShutdownHookBehavior(ShutdownHookBehavior.DONT_REGISTER)
>       addShutdownHook(new ShutdownHook(server, cacheManager))
>       server.start(props, cacheManager)
>    }
>    
>    private def instantiateCacheManager(configFile: String): EmbeddedCacheManager = {
> 	   var clazzName = props.getProperty(PROP_KEY_CACHE_MANAGER_CLASS)
> 	   val clazz = if (null == clazzName) {
> 	  	   classOf[DefaultCacheManager]
> 	   } else {
> 	  	  Class.forName(clazzName) 
> 	   }
> 	   if (null == configFile) {
> 	  	   createCacheManagerNoConfig(clazz.asInstanceOf[Class[EmbeddedCacheManager]]) 
> 	   } else {
> 	  	   createCacheManager(clazz.asInstanceOf[Class[EmbeddedCacheManager]], configFile)
> 	   }
>    }
>    
>    private def createCacheManagerNoConfig(clazz: Class[EmbeddedCacheManager]): EmbeddedCacheManager = {
> 	  val globalCfg = new GlobalConfiguration
>       globalCfg.setExposeGlobalJmxStatistics(true)
>       val defaultCfg = new Configuration
>       defaultCfg.setExposeJmxStatistics(true)
>       
>       try {
>     	  val constructor = clazz.getConstructor(classOf[GlobalConfiguration], classOf[Configuration])
>     	  return constructor.newInstance(globalCfg, defaultCfg)
>       } catch {
> 	  	   case e: NoSuchMethodException => {
> 	  	  	   System.err.println(clazz.toString + " does not have a constructor that takes GlobalConfiguration and Configuration instances")
> 			   System.exit(0)
> 			   return null
> 	  	   }
> 	   }
>    }
>    private def createCacheManager(clazz: Class[EmbeddedCacheManager], configFile: String): EmbeddedCacheManager = {
> 	   try {
> 		   val constructor = clazz.getConstructor(classOf[String])
> 		   return constructor.newInstance(configFile)
> 	   } catch {
> 	  	   case e: NoSuchMethodException => {
> 	  	  	   System.err.println(clazz.toString + " does not have a constructor that takes a config file path")
> 			   System.exit(0)
> 			   return null
> 	  	   }
> 	   }
>    }
>    private def processCommandLine(args: Array[String]) {
>       val sopts = "-:hD:Vp:l:m:t:c:r:i:n:s:e:o:x:f:"
>       val lopts = Array(
>          new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
>          new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
>          new LongOpt("port", LongOpt.REQUIRED_ARGUMENT, null, 'p'),
>          new LongOpt("host", LongOpt.REQUIRED_ARGUMENT, null, 'l'),
>          new LongOpt("master_threads", LongOpt.REQUIRED_ARGUMENT, null, 'm'),
>          new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
>          new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),
>          new LongOpt("protocol", LongOpt.REQUIRED_ARGUMENT, null, 'r'),
>          new LongOpt("idle_timeout", LongOpt.REQUIRED_ARGUMENT, null, 'i'),
>          new LongOpt("tcp_no_delay", LongOpt.REQUIRED_ARGUMENT, null, 'n'),
>          new LongOpt("send_buf_size", LongOpt.REQUIRED_ARGUMENT, null, 's'),
>          new LongOpt("recv_buf_size", LongOpt.REQUIRED_ARGUMENT, null, 'e'),
>          new LongOpt("proxy_host", LongOpt.REQUIRED_ARGUMENT, null, 'o'),
>          new LongOpt("proxy_port", LongOpt.REQUIRED_ARGUMENT, null, 'x'),
>          new LongOpt("cache_manager_class", LongOpt.REQUIRED_ARGUMENT, null, 'f')
>          )
>       val getopt = new Getopt("startServer", args, sopts, lopts)
>       var code: Int = 0
>       while ((({code = getopt.getopt; code})) != -1) {
>          code match {
>             case ':' | '?' => System.exit(1)
>             case 1 => System.err.println("startServer: unused non-option argument: " + getopt.getOptarg)
>             case 'h' => showAndExit
>             case 'V' => {
>                Version.printFullVersionInformation
>                System.exit(0)
>             }
>             case 'p' => props.setProperty(PROP_KEY_PORT, getopt.getOptarg)
>             case 'l' => props.setProperty(PROP_KEY_HOST, getopt.getOptarg)
>             case 'm' => props.setProperty(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
>             case 't' => props.setProperty(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
>             case 'c' => props.setProperty(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
>             case 'r' => props.setProperty(PROP_KEY_PROTOCOL, getopt.getOptarg)
>             case 'i' => props.setProperty(PROP_KEY_IDLE_TIMEOUT, getopt.getOptarg)
>             case 'n' => props.setProperty(PROP_KEY_TCP_NO_DELAY, getopt.getOptarg)
>             case 's' => props.setProperty(PROP_KEY_SEND_BUF_SIZE, getopt.getOptarg)
>             case 'e' => props.setProperty(PROP_KEY_RECV_BUF_SIZE, getopt.getOptarg)
>             case 'o' => props.setProperty(PROP_KEY_PROXY_HOST, getopt.getOptarg)
>             case 'x' => props.setProperty(PROP_KEY_PROXY_PORT, getopt.getOptarg)
>             case 'f' => props.setProperty(PROP_KEY_CACHE_MANAGER_CLASS, getopt.getOptarg)
>             case 'D' => {
>                val arg = getopt.getOptarg
>                var name = ""
>                var value = ""
>                var i = arg.indexOf("=")
>                if (i == -1) {
>                   name = arg
>                   value = "true"
>                } else {
>                   name = arg.substring(0, i)
>                   value = arg.substring(i + 1, arg.length)
>                }
>                System.setProperty(name, value)
>             }
>             case _ => throw new Exception("unhandled option code: " + code)
>          }
>       }
>    }
>    private def addShutdownHook(shutdownHook: Thread): Unit = {
>       AccessController.doPrivileged(new PrivilegedAction[Void] {
>          override def run = {
>             Runtime.getRuntime.addShutdownHook(shutdownHook)
>             null
>          }
>       })
>    }
>    private def showAndExit {
>       println("usage: startServer [options]")
>       println
>       println("options:")
>       println("    -h, --help                         Show this help message")
>       println
>       println("    -V, --version                      Show version information")
>       println
>       println("    --                                 Stop processing options")
>       println
>       println("    -p, --port=<num>                   TCP port number to listen on (default: 11211 for Memcached, 11222 for Hot Rod and 8181 for WebSocket server)")
>       println
>       println("    -l, --host=<host or ip>            Interface to listen on (default: 127.0.0.1, localhost)")
>       println
>       println("    -m, --master_threads=<num>         Number of threads accepting incoming connections (default: unlimited while resources are available)")
>       println
>       println("    -t, --work_threads=<num>           Number of threads processing incoming requests and sending responses (default: unlimited while resources are available)")
>       println
>       println("    -c, --cache_config=<filename>      Cache configuration file (default: creates cache with default values)")
>       println
>       println("    -r, --protocol=                    Protocol to understand by the server. This is a mandatory option and you should choose one of these options")
>       println("          [memcached|hotrod|websocket]")
>       println
>       println("    -i, --idle_timeout=<num>           Idle read timeout, in seconds, used to detect stale connections (default: -1).")
>       println("                                       If no new messages have been read within this time, the server disconnects the channel.")
>       println("                                       Passing -1 disables idle timeout.")
>       println
>       println("    -n, --tcp_no_delay=[true|false]    TCP no delay flag switch (default: true).")
>       println
>       println("    -s, --send_buf_size=<num>          Send buffer size (default: as defined by the OS).")
>       println
>       println("    -e, --recv_buf_size=<num>          Receive buffer size (default: as defined by the OS).")
>       println
>       println("    -o, --proxy_host=<host or ip>      Host address to expose in topology information sent to clients. If not present, it defaults to configured host. Servers that do not transmit topology information ignore this setting.")
>       println
>       println("    -x, --proxy_port=<num>             Port to expose in topology information sent to clients. If not present, it defaults to configured port. Servers that do not transmit topology information ignore this setting.")
>       println
>       println("    -f, --cache_manager_class=<clazz>  Cache manager class to be used instead of the default one (it has to extend EmbeddedCacheManager).")
>       println
>       println("    -D<name>[=<value>]                 Set a system property")
>       println
>       System.exit(0)
>    }
> }
> private class ShutdownHook(server: ProtocolServer, cacheManager: CacheContainer) extends Thread with Logging {
>    // Constructor code inline
>    setName("ShutdownHookThread")
>    override def run {
>       if (server != null) {
>          info("Posting Shutdown Request to the server...")
>          val tf = new ThreadFactory {
>             override def newThread(r: Runnable): Thread = new Thread(r, "StopThread")
>          }
>          val f = Executors.newSingleThreadExecutor(tf).submit(new Callable[Void] {
>             override def call = {
>                // Stop server first so that no new requests are allowed
>                server.stop
>                cacheManager.stop
>                null
>             }
>          })
>          try {
>             f.get
>          }
>          catch {
>             case ie: IOException => Thread.interrupted
>             case e: ExecutionException => throw new RuntimeException("Exception encountered in shutting down the server", e)
>          }
>       }
>    }
> }

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira



More information about the infinispan-issues mailing list