[infinispan-issues] [JBoss JIRA] Updated: (ISPN-955) Allow HotRod server to be passed a cache manager class to be used instead of DefaultCacheManager

Octavian Florescu (JIRA) jira-events at lists.jboss.org
Fri Feb 25 18:14:05 EST 2011


     [ https://issues.jboss.org/browse/ISPN-955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Octavian Florescu updated ISPN-955:
-----------------------------------

    Description: 
The cache manager class used by HotRod server is not pluggable. org.infinispan.server.core.Main always uses DefaultCacheManager.
Allow passing in the name of a user supplied class implementing EmbeddedCacheManager thought some mean (eg a param).

I implemented this functionality already (see the attachement, look for the addition of -f param), so I hope something like this makes it into 5.0.0



  was:
The cache manager class used by HotRod server is not pluggable. org.infinispan.server.core.Main always uses DefaultCacheManager.
Allow passing in the name of a user supplied class implementing EmbeddedCacheManager thought some mean (eg a param).

I implemented this functionality already (see bellow, look for the addition of -f param), so I hope something like this makes it into 5.0.0


package org.infinispan.server.core

import java.io.IOException
import java.security.{PrivilegedAction, AccessController}
import java.util.concurrent.{ThreadFactory, ExecutionException, Callable, Executors}
import gnu.getopt.{Getopt, LongOpt}
import org.infinispan.Version
import java.util.Properties
import org.infinispan.util.{TypedProperties, Util}
import org.infinispan.config.{Configuration, GlobalConfiguration}
import org.infinispan.manager.{EmbeddedCacheManager, CacheContainer, DefaultCacheManager}
import org.infinispan.config.GlobalConfiguration.ShutdownHookBehavior

/**
 * Main class for server startup.
 *
 * @author Galder Zamarreño
 * @since 4.1
 */
object Main extends Logging {
   
   val PROP_KEY_PORT = "infinispan.server.port"
   val PROP_KEY_HOST = "infinispan.server.host"
   val PROP_KEY_MASTER_THREADS = "infinispan.server.master_threads"
   val PROP_KEY_WORKER_THREADS = "infinispan.server.worker_threads"
   val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache_config"
   val PROP_KEY_PROTOCOL = "infinispan.server.protocol"
   val PROP_KEY_IDLE_TIMEOUT = "infinispan.server.idle_timeout"
   val PROP_KEY_TCP_NO_DELAY = "infinispan.server.tcp_no_delay"
   val PROP_KEY_SEND_BUF_SIZE = "infinispan.server.send_buf_size"
   val PROP_KEY_RECV_BUF_SIZE = "infinispan.server.recv_buf_size"
   val PROP_KEY_PROXY_HOST = "infinispan.server.proxy_host"
   val PROP_KEY_PROXY_PORT = "infinispan.server.proxy_port"
   val PROP_KEY_CACHE_MANAGER_CLASS = "infinispan.server.cache_manager_class"	   
   val HOST_DEFAULT = "127.0.0.1"
   val MASTER_THREADS_DEFAULT = 0
   val WORKER_THREADS_DEFAULT = 0
   val IDLE_TIMEOUT_DEFAULT = -1
   val TCP_NO_DELAY_DEFAULT = true
   val SEND_BUF_SIZE_DEFAULT = 0
   val RECV_BUF_SIZE_DEFAULT = 0

   /**
    * Server properties.  This object holds all of the required
    * information to get the server up and running. Use System
    * properties for defaults.
    */
   private val props: Properties = new TypedProperties(System.getProperties)
   
   private var server: ProtocolServer = _

   private var cacheManager: EmbeddedCacheManager = _

   def getServer = server

   def getCacheManager = cacheManager

   def main(args: Array[String]) {
      info("Start main with args: {0}", args.mkString(", "))
      val worker = new Callable[Void] {
         override def call = {
            try {
               boot(args)
            }
            catch {
               case e: Exception => {
                  System.err.println("Failed to boot JBoss:")
                  e.printStackTrace
                  throw e
               }
            }
            null
         }
      }
      val f = Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
         override def newThread(r: Runnable): Thread = {
            // TODO Maybe create thread names based on the protocol run
            return new Thread(r, "InfinispanServer-Main")
         }
      }).submit(worker)
      f.get
   }

   def boot(args: Array[String]) {
      // First process the command line to pickup custom props/settings
      processCommandLine(args)

      var protocol = props.getProperty(PROP_KEY_PROTOCOL)
      if (protocol == null) {
         System.err.println("ERROR: Please indicate protocol to run with -r parameter")
         showAndExit
      }

      // TODO: move class name and protocol number to a resource file under the corresponding project
      val clazz = protocol match {
         case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
         case "hotrod" => "org.infinispan.server.hotrod.HotRodServer"
         case "websocket" => "org.infinispan.server.websocket.WebSocketServer"
      }
      server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]

      val configFile = props.getProperty(PROP_KEY_CACHE_CONFIG)
      cacheManager = instantiateCacheManager(configFile)
      // Servers need a shutdown hook to close down network layer, so there's no need for an extra shutdown hook.
      // Removing Infinispan's hook also makes shutdown procedures for server and cache manager sequential, avoiding
      // issues with having the JGroups channel disconnected before it's removed itself from the topology view.
      cacheManager.getGlobalConfiguration.setShutdownHookBehavior(ShutdownHookBehavior.DONT_REGISTER)
      addShutdownHook(new ShutdownHook(server, cacheManager))
      server.start(props, cacheManager)
   }
   
   private def instantiateCacheManager(configFile: String): EmbeddedCacheManager = {
	   var clazzName = props.getProperty(PROP_KEY_CACHE_MANAGER_CLASS)
	   val clazz = if (null == clazzName) {
	  	   classOf[DefaultCacheManager]
	   } else {
	  	  Class.forName(clazzName) 
	   }
	   if (null == configFile) {
	  	   createCacheManagerNoConfig(clazz.asInstanceOf[Class[EmbeddedCacheManager]]) 
	   } else {
	  	   createCacheManager(clazz.asInstanceOf[Class[EmbeddedCacheManager]], configFile)
	   }
   }
   
   private def createCacheManagerNoConfig(clazz: Class[EmbeddedCacheManager]): EmbeddedCacheManager = {
	  val globalCfg = new GlobalConfiguration
      globalCfg.setExposeGlobalJmxStatistics(true)
      val defaultCfg = new Configuration
      defaultCfg.setExposeJmxStatistics(true)
      
      try {
    	  val constructor = clazz.getConstructor(classOf[GlobalConfiguration], classOf[Configuration])
    	  return constructor.newInstance(globalCfg, defaultCfg)
      } catch {
	  	   case e: NoSuchMethodException => {
	  	  	   System.err.println(clazz.toString + " does not have a constructor that takes GlobalConfiguration and Configuration instances")
			   System.exit(0)
			   return null
	  	   }
	   }
   }

   private def createCacheManager(clazz: Class[EmbeddedCacheManager], configFile: String): EmbeddedCacheManager = {
	   try {
		   val constructor = clazz.getConstructor(classOf[String])
		   return constructor.newInstance(configFile)
	   } catch {
	  	   case e: NoSuchMethodException => {
	  	  	   System.err.println(clazz.toString + " does not have a constructor that takes a config file path")
			   System.exit(0)
			   return null
	  	   }
	   }
   }

   private def processCommandLine(args: Array[String]) {
      val sopts = "-:hD:Vp:l:m:t:c:r:i:n:s:e:o:x:f:"
      val lopts = Array(
         new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
         new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
         new LongOpt("port", LongOpt.REQUIRED_ARGUMENT, null, 'p'),
         new LongOpt("host", LongOpt.REQUIRED_ARGUMENT, null, 'l'),
         new LongOpt("master_threads", LongOpt.REQUIRED_ARGUMENT, null, 'm'),
         new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
         new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),
         new LongOpt("protocol", LongOpt.REQUIRED_ARGUMENT, null, 'r'),
         new LongOpt("idle_timeout", LongOpt.REQUIRED_ARGUMENT, null, 'i'),
         new LongOpt("tcp_no_delay", LongOpt.REQUIRED_ARGUMENT, null, 'n'),
         new LongOpt("send_buf_size", LongOpt.REQUIRED_ARGUMENT, null, 's'),
         new LongOpt("recv_buf_size", LongOpt.REQUIRED_ARGUMENT, null, 'e'),
         new LongOpt("proxy_host", LongOpt.REQUIRED_ARGUMENT, null, 'o'),
         new LongOpt("proxy_port", LongOpt.REQUIRED_ARGUMENT, null, 'x'),
         new LongOpt("cache_manager_class", LongOpt.REQUIRED_ARGUMENT, null, 'f')
         )
      val getopt = new Getopt("startServer", args, sopts, lopts)
      var code: Int = 0
      while ((({code = getopt.getopt; code})) != -1) {
         code match {
            case ':' | '?' => System.exit(1)
            case 1 => System.err.println("startServer: unused non-option argument: " + getopt.getOptarg)
            case 'h' => showAndExit
            case 'V' => {
               Version.printFullVersionInformation
               System.exit(0)
            }
            case 'p' => props.setProperty(PROP_KEY_PORT, getopt.getOptarg)
            case 'l' => props.setProperty(PROP_KEY_HOST, getopt.getOptarg)
            case 'm' => props.setProperty(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
            case 't' => props.setProperty(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
            case 'c' => props.setProperty(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
            case 'r' => props.setProperty(PROP_KEY_PROTOCOL, getopt.getOptarg)
            case 'i' => props.setProperty(PROP_KEY_IDLE_TIMEOUT, getopt.getOptarg)
            case 'n' => props.setProperty(PROP_KEY_TCP_NO_DELAY, getopt.getOptarg)
            case 's' => props.setProperty(PROP_KEY_SEND_BUF_SIZE, getopt.getOptarg)
            case 'e' => props.setProperty(PROP_KEY_RECV_BUF_SIZE, getopt.getOptarg)
            case 'o' => props.setProperty(PROP_KEY_PROXY_HOST, getopt.getOptarg)
            case 'x' => props.setProperty(PROP_KEY_PROXY_PORT, getopt.getOptarg)
            case 'f' => props.setProperty(PROP_KEY_CACHE_MANAGER_CLASS, getopt.getOptarg)
            case 'D' => {
               val arg = getopt.getOptarg
               var name = ""
               var value = ""
               var i = arg.indexOf("=")
               if (i == -1) {
                  name = arg
                  value = "true"
               } else {
                  name = arg.substring(0, i)
                  value = arg.substring(i + 1, arg.length)
               }
               System.setProperty(name, value)
            }
            case _ => throw new Exception("unhandled option code: " + code)
         }
      }
   }

   private def addShutdownHook(shutdownHook: Thread): Unit = {
      AccessController.doPrivileged(new PrivilegedAction[Void] {
         override def run = {
            Runtime.getRuntime.addShutdownHook(shutdownHook)
            null
         }
      })
   }

   private def showAndExit {
      println("usage: startServer [options]")
      println
      println("options:")
      println("    -h, --help                         Show this help message")
      println
      println("    -V, --version                      Show version information")
      println
      println("    --                                 Stop processing options")
      println
      println("    -p, --port=<num>                   TCP port number to listen on (default: 11211 for Memcached, 11222 for Hot Rod and 8181 for WebSocket server)")
      println
      println("    -l, --host=<host or ip>            Interface to listen on (default: 127.0.0.1, localhost)")
      println
      println("    -m, --master_threads=<num>         Number of threads accepting incoming connections (default: unlimited while resources are available)")
      println
      println("    -t, --work_threads=<num>           Number of threads processing incoming requests and sending responses (default: unlimited while resources are available)")
      println
      println("    -c, --cache_config=<filename>      Cache configuration file (default: creates cache with default values)")
      println
      println("    -r, --protocol=                    Protocol to understand by the server. This is a mandatory option and you should choose one of these options")
      println("          [memcached|hotrod|websocket]")
      println
      println("    -i, --idle_timeout=<num>           Idle read timeout, in seconds, used to detect stale connections (default: -1).")
      println("                                       If no new messages have been read within this time, the server disconnects the channel.")
      println("                                       Passing -1 disables idle timeout.")
      println
      println("    -n, --tcp_no_delay=[true|false]    TCP no delay flag switch (default: true).")
      println
      println("    -s, --send_buf_size=<num>          Send buffer size (default: as defined by the OS).")
      println
      println("    -e, --recv_buf_size=<num>          Receive buffer size (default: as defined by the OS).")
      println
      println("    -o, --proxy_host=<host or ip>      Host address to expose in topology information sent to clients. If not present, it defaults to configured host. Servers that do not transmit topology information ignore this setting.")
      println
      println("    -x, --proxy_port=<num>             Port to expose in topology information sent to clients. If not present, it defaults to configured port. Servers that do not transmit topology information ignore this setting.")
      println
      println("    -f, --cache_manager_class=<clazz>  Cache manager class to be used instead of the default one (it has to extend EmbeddedCacheManager).")
      println
      println("    -D<name>[=<value>]                 Set a system property")
      println
      System.exit(0)
   }
}

private class ShutdownHook(server: ProtocolServer, cacheManager: CacheContainer) extends Thread with Logging {

   // Constructor code inline
   setName("ShutdownHookThread")

   override def run {
      if (server != null) {
         info("Posting Shutdown Request to the server...")
         val tf = new ThreadFactory {
            override def newThread(r: Runnable): Thread = new Thread(r, "StopThread")
         }

         val f = Executors.newSingleThreadExecutor(tf).submit(new Callable[Void] {
            override def call = {
               // Stop server first so that no new requests are allowed
               server.stop
               cacheManager.stop
               null
            }
         })
         try {
            f.get
         }
         catch {
            case ie: IOException => Thread.interrupted
            case e: ExecutionException => throw new RuntimeException("Exception encountered in shutting down the server", e)
         }
      }
   }
}




> Allow HotRod server to be passed a cache manager class to be used instead of DefaultCacheManager
> ------------------------------------------------------------------------------------------------
>
>                 Key: ISPN-955
>                 URL: https://issues.jboss.org/browse/ISPN-955
>             Project: Infinispan
>          Issue Type: Enhancement
>          Components: Cache Server
>    Affects Versions: 5.0.0.ALPHA3
>         Environment: 4.2.1.CR1
>            Reporter: Octavian Florescu
>            Assignee: Manik Surtani
>         Attachments: Main.scala
>
>
> The cache manager class used by HotRod server is not pluggable. org.infinispan.server.core.Main always uses DefaultCacheManager.
> Allow passing in the name of a user supplied class implementing EmbeddedCacheManager thought some mean (eg a param).
> I implemented this functionality already (see the attachement, look for the addition of -f param), so I hope something like this makes it into 5.0.0

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira



More information about the infinispan-issues mailing list