[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting ...

Ovidiu Feodorov ovidiu.feodorov at jboss.com
Tue Jan 23 00:08:21 EST 2007


  User: ovidiu  
  Date: 07/01/23 00:08:21

  Modified:    src/main/org/jboss/remoting   Tag: remoting_2_x
                        MicroRemoteClientInvoker.java ServerInvoker.java
  Log:
  more reformatting and minor refactoring
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.7.2.8   +2 -2      JBossRemoting/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MicroRemoteClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java,v
  retrieving revision 1.7.2.7
  retrieving revision 1.7.2.8
  diff -u -b -r1.7.2.7 -r1.7.2.8
  --- MicroRemoteClientInvoker.java	21 Jan 2007 08:38:32 -0000	1.7.2.7
  +++ MicroRemoteClientInvoker.java	23 Jan 2007 05:08:21 -0000	1.7.2.8
  @@ -27,7 +27,7 @@
    *
    * @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
    * @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
  - * @version $Revision: 1.7.2.7 $
  + * @version $Revision: 1.7.2.8 $
    */
   public abstract class MicroRemoteClientInvoker extends AbstractInvoker implements ClientInvoker
   {
  @@ -121,7 +121,7 @@
         // Now check if is remoting response and process
         if (returnValue instanceof InvocationResponse)
         {
  -         InvocationResponse response = (InvocationResponse) returnValue;
  +         InvocationResponse response = (InvocationResponse)returnValue;
            returnValue = response.getResult();
   
            // if is a server side exception, throw it
  
  
  
  1.52.2.21 +868 -858  JBossRemoting/src/main/org/jboss/remoting/ServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/ServerInvoker.java,v
  retrieving revision 1.52.2.20
  retrieving revision 1.52.2.21
  diff -u -b -r1.52.2.20 -r1.52.2.21
  --- ServerInvoker.java	21 Jan 2007 08:38:32 -0000	1.52.2.20
  +++ ServerInvoker.java	23 Jan 2007 05:08:21 -0000	1.52.2.21
  @@ -65,17 +65,17 @@
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.52.2.20 $
  + * @version $Revision: 1.52.2.21 $
    */
   public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
   {
  -   private static final Logger log = Logger.getLogger(ServerInvoker.class);
  +   // Constants ------------------------------------------------------------------------------------
   
  -   private static boolean trace = log.isTraceEnabled();
  +   private static final Logger log = Logger.getLogger(ServerInvoker.class);
   
      /**
  -    * Key for the the maximum number of thread to be used in the thread pool
  -    * for one way invocations (server side).
  +    * Key for the the maximum number of threads to be used in the thread pool for one way
  +    * invocations (server side).
       * This property is only used when the default oneway thread pool is used.
       */
      public static final String MAX_NUM_ONEWAY_THREADS_KEY = "maxNumThreadsOneway";
  @@ -84,15 +84,17 @@
       * Key for setting the setting the oneway thread pool to use.
       * The value used with this key will first be checked to see if is a JMX ObjectName and if so,
       * try to look up associated mbean for the ObjectName given and cast to type
  -    * org.jboss.util.threadpool.ThreadPoolMBean (via MBeanServerInvocationHandler.newProxyInstance()).
  -    * If the value is not a JMX ObjectName, will assume is a fully qualified classname and load the
  -    * coresponding class and create a new instance of it (which will require it to have a void constructor).
  -    * The newly created instance will then be cast to type of org.jboss.util.threadpool.ThreadPool.
  +    * org.jboss.util.threadpool.ThreadPoolMBean
  +    * (via MBeanServerInvocationHandler.newProxyInstance()). If the value is not a JMX ObjectName,
  +    * will assume is a fully qualified classname and load the coresponding class and create a new
  +    * instance of it (which will require it to have a void constructor). The newly created instance
  +    * will then be cast to type of org.jboss.util.threadpool.ThreadPool.
       */
      public static final String ONEWAY_THREAD_POOL_CLASS_KEY = "onewayThreadPool";
   
      /**
  -    * Key for setting the address the server invoker should bind to.  The value can be either host name or IP.
  +    * Key for setting the address the server invoker should bind to.
  +    * The value can be either host name or IP.
       */
      public static final String SERVER_BIND_ADDRESS_KEY = "serverBindAddress";
   
  @@ -100,31 +102,34 @@
       * Key for setting the addres the client invoker should connecto to.
       * This should be used when client will be connecting to server from outside the server's network
       * and the external address is different from that of the internal address the server invoker
  -    * will bind to (e.g. using NAT to expose different external address).  This will mostly be
  -    * useful when client uses remoting detection to discover remoting servers.
  -    * The value can be either host name or IP.
  +    * will bind to (e.g. using NAT to expose different external address). This will mostly be useful
  +    * when client uses remoting detection to discover remoting servers. The value can be either host
  +    * name or IP.
       */
      public static final String CLIENT_CONNECT_ADDRESS_KEY = "clientConnectAddress";
   
      /**
       * Key for setting the port the server invoker should bind to.
  -    * If the value supplied is less than or equal to zero, the server invoker will randomly choose a free port to use.
  +    * If the value supplied is less than or equal to zero, the server invoker will randomly choose
  +    * a free port to use.
       */
      public static final String SERVER_BIND_PORT_KEY = "serverBindPort";
   
      /**
       * key for setting the port the client invoker should connect to.
  -    * This should be used when client will be connecting to server from outside the server's
  -    * network and the external port is different from that of the internal port the server
  -    * invoker will bind to (e.g. using NAT to expose different port routing).
  -    * This will be mostly useful when client uses remoting detection to discover remoting servers.
  +    * This should be used when client will be connecting to server from outside the server's network
  +    * and the external port is different from that of the internal port the server invoker will bind
  +    * to (e.g. using NAT to expose different port routing). This will be mostly useful when client
  +    * uses remoting detection to discover remoting servers.
       */
      public static final String CLIENT_CONNECT_PORT_KEY = "clientConnectPort";
   
      /**
  -    * Key used for setting the amount of time (in milliseconds) that a client should renew its lease.
  -    * If this value is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used.
  -    * This value will also be what is given to the client when it initially querys server for leasing information.
  +    * Key used for setting the amount of time (in milliseconds) that a client should renew its
  +    * lease.
  +    * If this value is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will
  +    * be used. This value will also be what is given to the client when it initially querys server
  +    * for leasing information.
       */
      public static final String CLIENT_LEASE_PERIOD = "clientLeasePeriod";
   
  @@ -139,52 +144,50 @@
       * a proxy to it with type of org.jboss.remoting.security.ServerSocketFactoryMBean
       * (via MBeanServerInvocationHandler.newProxyInstance()).  If not a JMX ObjectName, will assume
       * is the fully qualified classname to the implementation to be used and will load the class,
  -    * create a new instance of it (which requires it to have a void constructor).
  -    * The instance will then be cast to type javax.net.ServerSocketFactory.
  +    * create a new instance of it (which requires it to have a void constructor). The instance will
  +    * then be cast to type javax.net.ServerSocketFactory.
       */
      public static final String SERVER_SOCKET_FACTORY = "serverSocketFactory";
   
      /**
  -    * The max number of worker threads to be used in the
  -    * pool for processing one way calls on the server side.
  -    * Value is is 100.
  +    * The max number of worker threads to be used in the pool for processing one way calls on the
  +    * server side. Value is is 100.
       */
      public static final int MAX_NUM_ONEWAY_THREADS = 100;
   
      /**
  -    * The default lease period for clients.  This is the number of
  -    * milliseconds that a client will be required to renew their lease
  -    * with the server.  The default value is 5 seconds.
  +    * The default lease period for clients. This is the number of milliseconds that a client will be
  +    * required to renew their lease with the server. The default value is 5 seconds.
       */
      public static final int DEFAULT_CLIENT_LEASE_PERIOD = 5000;
   
      /**
  -    * The default timeout period for socket connections.
  -    * The default value is 60000 milliseconds.
  +    * The default timeout period for socket connections. The default value is 60000 milliseconds.
       */
      public static final int DEFAULT_TIMEOUT_PERIOD = 60000;
  +
  +
  +   // Static ---------------------------------------------------------------------------------------
  +
  +   private static boolean trace = log.isTraceEnabled();
  +
  +   // Attributes -----------------------------------------------------------------------------------
  +
      /**
       * Indicated the max number of threads used within oneway thread pool.
       */
      private int maxNumberThreads = MAX_NUM_ONEWAY_THREADS;
      private String onewayThreadPoolClass = null;
      private ThreadPool onewayThreadPool;
  -
  -   protected Map handlers = new HashMap();
  -   protected Map callbackHandlers = new HashMap();
  -   protected Map clientCallbackListener = new HashMap();
  -   protected boolean started = false;
      private boolean created = false;
   
      private MBeanServer mbeanServer = null;
   
      private String dataType;
  -
      private String serverBindAddress = null;
      private int serverBindPort = 0;
      private String clientConnectAddress = null;
      private int clientConnectPort = -1;
  -
      private int timeout = DEFAULT_TIMEOUT_PERIOD;
   
      // indicates the lease timeout period for clients
  @@ -192,10 +195,16 @@
      private boolean leaseManagement = false;
      private Map clientLeases = new HashMap();
   
  -   protected ConnectionNotifier connectionNotifier = new ConnectionNotifier();
   
  +   protected Map handlers = new HashMap();
  +   protected Map callbackHandlers = new HashMap();
  +   protected Map clientCallbackListener = new HashMap();
  +   protected boolean started = false;
  +   protected ConnectionNotifier connectionNotifier = new ConnectionNotifier();
      protected ServerSocketFactory serverSocketFactory = null;
   
  +   // Constructors ---------------------------------------------------------------------------------
  +
      public ServerInvoker(InvokerLocator locator)
      {
         super(locator);
  @@ -211,7 +220,9 @@
         super(locator, configuration);
   
         if (configuration != null)
  +      {
            this.configuration.putAll(configuration);
  +      }
   
         Map locatorParams = locator.getParameters();
         if(locatorParams != null)
  @@ -220,987 +231,974 @@
         }
      }
   
  -   protected void setup() throws Exception
  -   {
  -      Map config = getConfiguration();
  -      String maxNumOfThreads = (String) config.get(MAX_NUM_ONEWAY_THREADS_KEY);
  -      if(maxNumOfThreads != null && maxNumOfThreads.length() > 0)
  -      {
  -         try
  +   // Public ---------------------------------------------------------------------------------------
  +
  +   public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
            {
  -            maxNumberThreads = Integer.parseInt(maxNumOfThreads);
  +      this.serverSocketFactory = serverSocketFactory;
            }
  -         catch(NumberFormatException e)
  +
  +   public ServerSocketFactory getServerSocketFactory()
            {
  -            log.error("Can not convert max number of threads value (" + maxNumOfThreads + ") into a number.");
  -         }
  +      return serverSocketFactory;
         }
  -      onewayThreadPoolClass = (String) config.get(ONEWAY_THREAD_POOL_CLASS_KEY);
   
  -      String locatorHost = locator.getHost();
  -      InetAddress addr = null;
  -      if(locatorHost != null)
  +   /**
  +    * Sets timeout (in millseconds) to be used for the socket connection.
  +    */
  +   public void setTimeout(int timeout)
         {
  -         addr = InetAddress.getByName(locator.getHost());
  +      this.timeout = timeout;
         }
  -      else
  +
  +   /**
  +    * The timeout (in milliseconds) used for the socket connection.
  +    */
  +   public int getTimeout()
         {
  -         addr = InetAddress.getLocalHost();
  +      return timeout;
         }
  -      int port = locator.getPort();
  -      if(port <= 0)
  +
  +   public boolean isLeaseActivated()
         {
  -         port = assignPort();
  +      return leaseManagement;
         }
   
  -      // set the bind address
  -      serverBindAddress = (String) config.get(SERVER_BIND_ADDRESS_KEY);
  -      clientConnectAddress = (String) config.get(CLIENT_CONNECT_ADDRESS_KEY);
  -      if(serverBindAddress == null)
  +   public void addConnectionListener(ConnectionListener listener)
         {
  -         if(clientConnectAddress != null)
  +      if(listener != null)
            {
  -            // can't use uri address, as is for client only
  -            serverBindAddress = InetAddress.getLocalHost().getHostAddress();
  +         connectionNotifier.addListener(listener);
  +
  +         if(leasePeriod > 0)
  +         {
  +            leaseManagement = true;
  +         }
            }
            else
            {
  -            serverBindAddress = addr.getHostAddress();
  +         throw new IllegalArgumentException("Can not add null ConnectionListener.");
            }
         }
   
  -      // set the bind port
  -      String serverBindPortString = (String) config.get(SERVER_BIND_PORT_KEY);
  -      String clientConnectPortString = (String) config.get(CLIENT_CONNECT_PORT_KEY);
  -      if(clientConnectPortString != null)
  -      {
  -         try
  -         {
  -            clientConnectPort = Integer.parseInt(clientConnectPortString);
  -         }
  -         catch(NumberFormatException e)
  +   public void removeConnectionListener(ConnectionListener listener)
            {
  -            throw new InvalidConfigurationException("Can not set client bind port because can not convert given value (" + clientConnectPortString + ") to a number.");
  -         }
  -      }
  -      if(serverBindPortString != null)
  +      if(connectionNotifier != null)
         {
  -         try
  +         connectionNotifier.removeListener(listener);
  +
  +         // turn off lease management if no listeners (since no one to tell client died)
  +         if(connectionNotifier.size() == 0)
            {
  -            serverBindPort = Integer.parseInt(serverBindPortString);
  -            if(serverBindPort <= 0)
  +            leaseManagement = false;
  +
  +            // go through any existing leases and terminate them
  +            Set clientKeys = clientLeases.keySet();
  +            Iterator itr = clientKeys.iterator();
  +            while(itr.hasNext())
               {
  -               serverBindPort = assignPort();
  +               String sessionId = (String)itr.next();
  +               Lease clientLease = (Lease)clientLeases.get(sessionId);
  +               clientLease.terminateLease(sessionId);
               }
  -
  +            clientLeases.clear();
            }
  -         catch(NumberFormatException e)
  -         {
  -            throw new InvalidConfigurationException("Can not set server bind port because can not convert given value (" + serverBindPortString + ") to a number.");
            }
         }
  -      else
  +
  +   /**
  +    * Sets the amount of time (in milliseconds) that a client should renew its lease. If this value
  +    * is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used. This
  +    * value will also be what is given to the client when it initially querys server for leasing
  +    * information. If set after create() method called, this value will override value set by
  +    * CLIENT_LEASE_PERIOD key.
  +    */
  +   public void setLeasePeriod(long leasePeriodValue)
         {
  -         if(clientConnectPort > 0)
  +      this.leasePeriod = leasePeriodValue;
  +
  +      if (leasePeriod <= 0)
            {
  -            // can't use uri port, as is for client only
  -            serverBindPort = PortUtil.findFreePort(locator.getHost());
  +    	  this.leaseManagement = false;
            }
            else
            {
  -            serverBindPort = port;
  -         }
  -      }
  -
  -      // get timeout config
  -      String timeoutPeriod = (String)config.get(TIMEOUT);
  -      if(timeoutPeriod != null && timeoutPeriod.length() > 0)
  -      {
  -         try
  +         if(connectionNotifier != null && connectionNotifier.size() > 0)
            {
  -            timeout = Integer.parseInt(timeoutPeriod);
  +            this.leaseManagement = true;
            }
  -         catch(NumberFormatException e)
  -         {
  -            throw new InvalidConfigurationException("Can not set timeout because can not convert give value (" + timeoutPeriod + ") to a number.");
            }
         }
   
  -      // config for client lease period
  -      String clientLeasePeriod = (String)config.get(CLIENT_LEASE_PERIOD);
  -      if(clientLeasePeriod != null)
  -      {
  -         try
  +   /**
  +    * Gets the amount of time (in milliseconds) that a client should renew its lease.
  +    */
  +   public long getLeasePeriod()
            {
  -            long leasePeriodValue = Long.parseLong(clientLeasePeriod);
  -            setLeasePeriod(leasePeriodValue);
  +      return leasePeriod;
            }
  -         catch(NumberFormatException e)
  +
  +   /**
  +    * @jmx:managed-attribute
  +    */
  +   public String getClientConnectAddress()
            {
  -            throw new InvalidConfigurationException("Can not set client lease period because can not convert given value (" + clientLeasePeriod + ") to a number.");
  -         }
  +      return clientConnectAddress;
         }
   
  -      createServerSocketFactory();
  +   public int getClientConnectPort()
  +   {
  +      return clientConnectPort;
      }
   
  -   protected int assignPort() throws IOException
  +   public void setClientConnectPort(int clientConnectPort)
      {
  -      int port;
  -      port = PortUtil.findFreePort(locator.getHost());
  -      // re-write locator since the port is different
  -      InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), port, locator.getPath(), locator.getParameters());
  -      // need to update the locator key used in the invoker registry
  -      InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
  -      this.locator = newLocator;
  -      return port;
  +      this.clientConnectPort = clientConnectPort;
      }
   
  -   public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
  +   /**
  +    * This method should only be called by the service controller when this invoker is specified
  +    * within the Connector configuration of a service xml. Calling this directly will have no
  +    * effect, as will be used in building the locator uri that is published for detection and this
  +    * happens when the invoker is first created and started (after that, no one will be aware of a
  +    * change).
  +    *
  +    * @jmx:managed-attribute
  +    */
  +   public void setClientConnectAddress(String clientConnectAddress)
      {
  -      this.serverSocketFactory = serverSocketFactory;
  +      this.clientConnectAddress = clientConnectAddress;
      }
   
  -   public ServerSocketFactory getServerSocketFactory()
  +   public String getServerBindAddress()
      {
  -      return serverSocketFactory;
  +      return serverBindAddress;
      }
   
  -   protected ServerSocketFactory createServerSocketFactory() throws IOException
  -   {
  -      // only want to look at config if server socket factory has not already been set
  -      if(serverSocketFactory == null)
  -      {
  -         Object obj = configuration.get(Remoting.CUSTOM_SERVER_SOCKET_FACTORY);
  -         if (obj != null)
  -         {
  -            if (obj instanceof ServerSocketFactory)
  +   public int getServerBindPort()
               {
  -               serverSocketFactory = (ServerSocketFactory) obj;
  +      return serverBindPort;
               }
  -            else
  +
  +   /**
  +    * Sets the maximum number of thread to be used in the thread pool for one way invocations
  +    * (server side). This property is only used when the default oneway thread pool is used. If set
  +    * after create() method called, this value will override value set by MAX_NUM_ONEWAY_THREADS_KEY
  +    * key.
  +    */
  +   public void setMaxNumberOfOnewayThreads(int numOfThreads)
               {
  -               throw new RuntimeException("Can not set custom server socket factory (" + obj + ") as is not of type javax.net.SocketFactory");
  -            }
  +      this.maxNumberThreads = numOfThreads;
            }
   
  -         if (serverSocketFactory == null)
  -         {
               /**
  -             * TODO: -TME This is another big hack because of dependancy on JMX within configuration.
  -             * Have to wait till the mbean server is set before can actually set the server socket
  -             * factory since it is an mbean (new server's DI will fix all this).  Would prefer this
  -             * to be in the setup() method...
  -             * Also, I can't cast the mbean proxy directly to ServerSocketFactory because it is not
  -             * an interface.  Therefore, have to require that ServerSocketFactoryMBean is used.
  +    * Gets the maximum number of thread to be used in the thread pool for one way invocations
  +    * (server side).
                */
  -            String serverSocketFactoryString = (String)configuration.get(SERVER_SOCKET_FACTORY);
  -            if(serverSocketFactoryString != null && serverSocketFactoryString.length() > 0)
  -            {
  -               try
  -               {
  -                  if(serverSocketFactoryString != null)
  +   public int getMaxNumberOfOnewayThreads()
                     {
  -                     MBeanServer server = getMBeanServer();
  -                     ObjectName serverSocketFactoryObjName = new ObjectName(serverSocketFactoryString);
  -                     if(server != null)
  +      return this.maxNumberThreads;
  +   }
  +
  +   /**
  +    * Gets the oneway thread pool to use.
  +    */
  +   public ThreadPool getOnewayThreadPool()
                        {
  -                        try
  +      if(onewayThreadPool == null)
                           {
  -                           ServerSocketFactoryMBean serverSocketFactoryMBean = (ServerSocketFactoryMBean) MBeanServerInvocationHandler.newProxyInstance(server,
  -                                                                                                                                                        serverSocketFactoryObjName,
  -                                 																														ServerSocketFactoryMBean.class,
  -                                 																														false);
  -                           serverSocketFactory = new ServerSocketFactoryWrapper(serverSocketFactoryMBean);
  -                        }
  -                        catch(Exception e)
  +         // if no thread pool class set, then use default BasicThreadPool
  +         if(onewayThreadPoolClass == null || onewayThreadPoolClass.length() == 0)
                           {
  -                           log.debug("Error creating mbean proxy for server socket factory for object name: " + serverSocketFactoryObjName + ".  Will try by class name.");
  -                        }
  +            BasicThreadPool pool = new BasicThreadPool("JBossRemoting Client Oneway");
  +            pool.setMaximumPoolSize(maxNumberThreads);
  +            pool.setBlockingMode(BlockingMode.WAIT);
  +            onewayThreadPool = pool;
                        }
                        else
                        {
  -                        log.debug("The 'serverSocketFactory' attribute was set with a value, but the MBeanServer reference is null.");
  -                     }
  -                  }
  -               }
  -               catch(MalformedObjectNameException e)
  +            //first check to see if this is an ObjectName
  +            boolean isObjName = false;
  +            try
                  {
  -                  log.debug("Attibute value (" + serverSocketFactoryString + ") passed is not a valid ObjectName.  Can not look up if is a mbean service.  Will try by classname.");
  +               ObjectName objName = new ObjectName(onewayThreadPoolClass);
  +               onewayThreadPool = createThreadPoolProxy(objName);
  +               isObjName = true;
                  }
  -               catch(NullPointerException e)
  +            catch(MalformedObjectNameException e)
                  {
  -                  log.debug("Could not set up the server socket factory as a mbean service due to null pointer exception.");
  +               log.debug("Thread pool class supplied is not an object name.");
                  }
   
  -               // couldn't create from object name for mbean service, will try by class name
  -               if(serverSocketFactory == null)
  +            if(!isObjName)
                  {
                     try
                     {
  -                     Class cl = ClassLoaderUtility.loadClass(serverSocketFactoryString, getClass());
  -
  -                     Constructor serverSocketConstructor = null;
  -                     serverSocketConstructor = cl.getConstructor(new Class[]{});
  -                     serverSocketFactory = (ServerSocketFactory)serverSocketConstructor.newInstance(new Object[] {});
  -                     log.trace("ServerSocketFactory (" + serverSocketFactoryString + ") loaded");
  +                  onewayThreadPool = (ThreadPool)Class.
  +                     forName(onewayThreadPoolClass, false, getClassLoader()).newInstance();
                     }
                     catch(Exception e)
                     {
  -                     log.debug("Could not create server socket factory by classname (" + serverSocketFactoryString + ").  Error message: " + e.getMessage());
  +                  throw new RuntimeException("Error loading instance of ThreadPool based " +
  +                     "on class name " + onewayThreadPoolClass);
                     }
                  }
               }
            }
  +      return onewayThreadPool;
         }
   
  -      if (serverSocketFactory == null && needsCustomSSLConfiguration(configuration))
  -      {
  -         try
  +   /**
  +    * Sets the oneway thread pool to use.
  +    */
  +   public void setOnewayThreadPool(ThreadPool pool)
            {
  -            SSLSocketBuilder socketBuilder = new SSLSocketBuilder(configuration);
  -            socketBuilder.setUseSSLServerSocketFactory( false );
  -            serverSocketFactory = socketBuilder.createSSLServerSocketFactory();
  +      this.onewayThreadPool = pool;
            }
  -         catch (IOException e)
  +
  +   public MBeanServer getMBeanServer()
            {
  -            throw new RuntimeException("Unable to create customized SSL socket factory", e);
  -         }
  +      return mbeanServer;
         }
   
  -      if(serverSocketFactory == null)
  +   public void setMBeanServer(MBeanServer server)
         {
  -         log.debug(this + " did not find server socket factory configuration as mbean service " +
  -            "or classname. Creating default server socket factory.");
  -
  -         serverSocketFactory = getDefaultServerSocketFactory();
  +      // This has been added in order to support mbean service configuration. Now supporting
  +      // classes, such as the ServerInvokerCallbackHandler can find and use resources such as
  +      // CallbackStore, which can be run as a service mbean (and specified via object name within
  +      // config). The use of JMX throughout remoting is a problem as now have to tie it in all
  +      // throughout the code for service configuration as is being done here. When migrate to use
  +      // under new server model, which does not depend on JMX, can rip out code such as this.
  +      this.mbeanServer = server;
         }
   
  -      log.debug(this + " created server socket factory " + serverSocketFactory);
  -
  -      serverSocketFactory = wrapServerSocketFactory(serverSocketFactory, configuration);
  -      return serverSocketFactory;
  -
  +   /**
  +    * @return true if a server invocation handler has been registered for this subsystem.
  +    */
  +   public synchronized boolean hasInvocationHandler(String subsystem)
  +   {
  +      return handlers.containsKey(subsystem);
      }
   
  -   protected boolean justNeedsSSLClientMode(Map configuration)
  -   {
  -      if (configuration.size() == 1 && configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE))
  +   /**
  +    * @return an array of keys for each subsystem this invoker can handle.
  +    */
  +   public synchronized String[] getSupportedSubsystems()
         {
  -         String useClientModeString = (String) configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
  -         return Boolean.valueOf(useClientModeString).booleanValue();
  +      String subsystems [] = new String[handlers.size()];
  +      return (String[]) handlers.keySet().toArray(subsystems);
         }
   
  -      if (configuration.size() == 1 && configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
  +   /**
  +    * @return an array of the server invocation handlers.
  +    */
  +   public synchronized ServerInvocationHandler[] getInvocationHandlers()
         {
  -         String useClientModeString = (String) configuration.get(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE);
  -         return Boolean.valueOf(useClientModeString).booleanValue();
  +      ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
  +      return (ServerInvocationHandler[]) handlers.values().toArray(ih);
         }
   
  -      if (configuration.size() == 2
  -            && configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE)
  -            && configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
  +   /**
  +    * Add a server invocation handler for a particular subsystem. Typically, subsystems are defined
  +    * in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
  +    *
  +    * @return previous ServerInvocationHandler with the same sybsystem value (case insensitive) or
  +    *         null if a previous one did not exist.
  +    */
  +   public synchronized ServerInvocationHandler addInvocationHandler(String subsystem,
  +                                                                    ServerInvocationHandler handler)
         {
  -         String useClientModeString = (String) configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
  -         return Boolean.valueOf(useClientModeString).booleanValue();
  -      }
  +      handler.setInvoker(this);
   
  -      return false;
  +      ServerInvocationHandler oldHandler =
  +         (ServerInvocationHandler)handlers.put(subsystem.toUpperCase(), handler);
  +
  +      log.debug(this + " added " + handler + " for subsystem '" + subsystem + "'" +
  +         (oldHandler == null ? "" : ", replacing old handler " + oldHandler));
  +
  +      return oldHandler;
      }
   
      /**
  -    * Gets the default server socket factory to use for the server invoker.  The intention
  -    * is this method will be overridden by sub-classes for their specific defaults.
  -    * @return
  +    * Remove a subsystem invocation handler.
       */
  -   protected ServerSocketFactory getDefaultServerSocketFactory() throws IOException
  +   public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
      {
  -      return ServerSocketFactory.getDefault();
  -   }
  +      ServerInvocationHandler handler =
  +         (ServerInvocationHandler)handlers.remove(subsystem.toUpperCase());
  +
  +      log.debug(this + (handler == null ?
  +         " tried to remove handler for " + subsystem + " but no handler found" :
  +         " removed handler " + handler + " for subsystem '" + subsystem + "'"));
   
  +      return handler;
  +   }
   
  -   protected ServerSocketFactory wrapServerSocketFactory(ServerSocketFactory ssf, Map config)
  +   /**
  +    * Get a ServerInvocationHandler for a given subsystem type.
  +    */
  +   public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
      {
  -      if (config == null)
  -         return ssf;
  +      return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
  +   }
   
  -      Object o = config.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
  +   public Object invoke(Object invoke) throws IOException
  +   {
  +      InvocationRequest request = null;
  +      InvocationResponse response = null;
   
  -      if (o == null)
  -         return ssf;
  +      if(trace) { log.trace("server received invocation " + invoke); }
   
  -      if (o instanceof SocketCreationListener)
  -      {
  -         return new CreationListenerServerSocketFactory(ssf, (SocketCreationListener) o);
  -      }
  -      else if (o instanceof String)
  +      if(invoke != null && invoke instanceof InvocationRequest)
         {
  +         request = (InvocationRequest) invoke;
            try
            {
  -            Class c = ClassLoaderUtility.loadClass((String) o, ServerInvoker.class);
  -            SocketCreationListener listener = (SocketCreationListener) c.newInstance();
  -            return new CreationListenerServerSocketFactory(ssf, listener);
  +
  +            Object result = invoke(request);
  +
  +            response = new InvocationResponse(request.getSessionId(),
  +                                              result, false, request.getReturnPayload());
  +
            }
  -         catch (Exception e)
  +         catch(Throwable throwable)
            {
  -            log.error("unable to instantiate class: " + o, e);
  -            return ssf;
  +            response = new InvocationResponse(request.getSessionId(),
  +                                              throwable, true, request.getReturnPayload());
            }
         }
         else
         {
  -         log.error("unrecognized type for socket creation server listener: " + o);
  -         return ssf;
  +         log.error("server invoker received " + invoke + " as invocation. " +
  +            "Must not be null and must be of type InvocationRequest.");
  +
  +         Exception e = new Exception("Error processing invocation request on " + getLocator() +
  +            ". Either invocation was null or of wrong type.");
  +
  +         response =
  +            new InvocationResponse(request.getSessionId(), e, true, request.getReturnPayload());
         }
  +      return response;
      }
   
  -
      /**
  -    * Sets timeout (in millseconds) to be used for the socket connection.
  -    * @param timeout
  +    * Processes invocation request depending on the invocation type (internal, name based, oneway,
  +    * etc). Can be called on directly when client and server are local to one another (by-passing
  +    * serialization).
       */
  -   public void setTimeout(int timeout)
  +   public Object invoke(InvocationRequest invocation) throws Throwable
      {
  -      this.timeout = timeout;
  -   }
  +      if (isStarted())
  +      {
  +         Object param = invocation.getParameter();
  +         Object result = null;
   
  -   /**
  -    * The timeout (in milliseconds) used for the socket connection.
  -    */
  -   public int getTimeout()
  +         // check to see if this is a is alive ping
  +         if ("$PING$".equals(param))
      {
  -      return timeout;
  +            //if checking lease, need to update lease flag
  +            if (leaseManagement)
  +            {
  +               //NOTE we only update the lease when we receive a PING, not for
  +               //all invocations
  +               updateClientLease(invocation);
      }
   
  -   public boolean isLeaseActivated()
  -   {
  -      return leaseManagement;
  +            // if this is an invocation ping, just pong back
  +            Map responseMap = new HashMap();
  +            responseMap.put(CLIENT_LEASE_PERIOD, new Long(leasePeriod));
  +            return new InvocationResponse(invocation.getSessionId(), new Boolean(leaseManagement),
  +                                          false, responseMap);
      }
   
  -   public void addConnectionListener(ConnectionListener listener)
  +         if ("$DISCONNECT$".equals(param))
      {
  -      if(listener != null)
  +            if (leaseManagement)
         {
  -         connectionNotifier.addListener(listener);
  +               terminateLease(invocation);
  +            }
  +            return null;
  +         }
   
  -         if(leasePeriod > 0)
  +         //TODO: -TME both oneway and internal invocation will be broken since have not
  +         // deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
  +         if (param instanceof OnewayInvocation)
            {
  -            leaseManagement = true;
  +            // no point in delaying return to client if oneway
  +            handleOnewayInvocation((OnewayInvocation)param, invocation);
            }
  +         else
  +         {
  +            String subsystem = invocation.getSubsystem();
  +            String clientId = invocation.getSessionId();
  +
  +            // too bad we can't optimize this a little better, since we take a lookup hit for
  +            // each invocation -JGH
  +            ServerInvocationHandler handler = null;
  +            if (subsystem != null)
  +            {
  +               handler = (ServerInvocationHandler)handlers.get(subsystem.toUpperCase());
         }
         else
         {
  -         throw new IllegalArgumentException("Can not add null ConnectionListener.");
  +               // subsystem not specified, so will hope for a default one being set
  +               if (!handlers.isEmpty())
  +               {
  +                  handler = (ServerInvocationHandler)handlers.values().iterator().next();
         }
      }
   
  -   public void removeConnectionListener(ConnectionListener listener)
  +            if (param instanceof InternalInvocation)
      {
  -      if(connectionNotifier != null)
  +               result = handleInternalInvocation((InternalInvocation)param, invocation, handler);
  +            }
  +            else
         {
  -         connectionNotifier.removeListener(listener);
  +               if (trace) { log.trace(this + " dispatching " + invocation + " from client " + clientId + " to subsystem '" + subsystem + "'"); }
   
  -         // turn off lease management if no listeners (since no one to tell client died)
  -         if(connectionNotifier.size() == 0)
  +               if (handler == null)
            {
  -            leaseManagement = false;
  +                  throw new InvalidConfigurationException(
  +                     "Can not handle invocation request for subsystem(" + subsystem + ") because " +
  +                     "there are no matching ServerInvocationHandlers registered. Please add via " +
  +                     "xml configuration or via the Connector's addInvocationHandler() method.");
  +               }
  +               result = handler.invoke(invocation);
  +            }
   
  -            // go through any existing leases and terminate them
  -            Set clientKeys = clientLeases.keySet();
  -            Iterator itr = clientKeys.iterator();
  -            while(itr.hasNext())
  -            {
  -               String sessionId = (String)itr.next();
  -               Lease clientLease = (Lease)clientLeases.get(sessionId);
  -               clientLease.terminateLease(sessionId);
  +            if (trace) { log.trace(this + " successfully dispatched invocation, returning " + result + " from subsystem '" + subsystem + "' to client " + clientId); }
               }
  -            clientLeases.clear();
  +
  +         return result;
            }
  +      else
  +      {
  +         log.warn(this + " can not process invocation requests since is not in started state!");
  +         throw new InvalidStateException(
  +            "Can not process invocation request since is not in started state.");
         }
      }
   
      /**
  -    * Sets the amount of time (in milliseconds) that a client should renew its lease. If this value
  -    * is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used.
  -    * This value will also be what is given to the client when it initially querys server for
  -    * leasing information. If set after create() method called, this value will override value set
  -    * by CLIENT_LEASE_PERIOD key.
  +    * Will get the data type for the marshaller factory so know which marshaller to get to marshal
  +    * the data. Will first check the locator uri for a 'datatype' parameter and take that value if
  +    * it exists. Otherwise, will use the default datatype for the client invoker, based on
  +    * transport.
       */
  -   public void setLeasePeriod(long leasePeriodValue)
  +   public String getDataType()
      {
  -      this.leasePeriod = leasePeriodValue;
  -
  -      if (leasePeriod <= 0)
  +      if(dataType == null)
         {
  -    	  this.leaseManagement = false;
  +         dataType = getDataType(getLocator());
  +         if(dataType == null)
  +         {
  +            dataType = getDefaultDataType();
         }
  -      else
  +      }
  +      return dataType;
  +   }
  +
  +   public void create()
         {
  -         if(connectionNotifier != null && connectionNotifier.size() > 0)
  +      if(!created)
            {
  -            this.leaseManagement = true;
  +         try
  +         {
  +            setup();
  +         }
  +         catch(Exception e)
  +         {
  +            throw new RuntimeException("Error setting up server invoker " + this, e);
            }
  +         created = true;
         }
      }
   
      /**
  -    * Gets the amount of time (in milliseconds) that a client should renew its lease.
  +    * Subclasses should override to provide any specific start logic.
       */
  -   public long getLeasePeriod()
  +   public void start() throws IOException
      {
  -      return leasePeriod;
  +      started = true;
  +      log.debug(this + " started for locator " + getLocator());
      }
   
      /**
  -    * @jmx:managed-attribute
  +    * @return true if the server invoker is started, false if not.
       */
  -   public String getClientConnectAddress()
  +   public boolean isStarted()
      {
  -      return clientConnectAddress;
  -   }
  -
  -   public int getClientConnectPort()
  -   {
  -      return clientConnectPort;
  -   }
  -
  -   public void setClientConnectPort(int clientConnectPort)
  -   {
  -      this.clientConnectPort = clientConnectPort;
  +      return started;
      }
   
      /**
  -    * This method should only be called by the service controller when this invoker is
  -    * specified within the Connector configuration of a service xml.  Calling this directly
  -    * will have no effect, as will be used in building the locator uri that is published
  -    * for detection and this happens when the invoker is first created and started (after that, no one
  -    * will be aware of a change).
  -    *
  -    * @jmx:managed-attribute
  +    * Subclasses should override to provide any specific stop logic.
       */
  -   public void setClientConnectAddress(String clientConnectAddress)
  +   public void stop()
      {
  -      this.clientConnectAddress = clientConnectAddress;
  -   }
  -
  +      started = false;
   
  -   public String getServerBindAddress()
  +      for(Iterator i = callbackHandlers.values().iterator(); i.hasNext(); )
      {
  -      return serverBindAddress;
  +         ServerInvokerCallbackHandler callbackHandler = (ServerInvokerCallbackHandler)i.next();
  +         callbackHandler.destroy();
      }
   
  -   public int getServerBindPort()
  -   {
  -      return serverBindPort;
  +      log.debug(this + " stopped");
      }
   
      /**
  -    * Sets the maximum number of thread to be used in the thread pool for one way invocations (server side).
  -    * This property is only used when the default oneway thread pool is used.
  -    * If set after create() method called, this value will override value set by MAX_NUM_ONEWAY_THREADS_KEY key.
  -    * @param numOfThreads
  +    * Destory the invoker permanently.
       */
  -   public void setMaxNumberOfOnewayThreads(int numOfThreads)
  +   public void destroy()
      {
  -      this.maxNumberThreads = numOfThreads;
  +      if(classbyteloader != null)
  +      {
  +         classbyteloader.destroy();
  +      }
      }
   
      /**
  -    * Gets the maximum number of thread to be used in the thread pool for one way invocations (server side).
  -    * @return
  +    * Sets the server invoker's transport specific configuration. Will need to set before calling
  +    * start() method (or at least stop() and start() again) before configurations will take affect.
       */
  -   public int getMaxNumberOfOnewayThreads()
  +   public void setConfiguration(Map configuration)
      {
  -      return this.maxNumberThreads;
  +      this.configuration = configuration;
      }
   
      /**
  -    * Gets the oneway thread pool to use.
  -    * @return
  +    * Gets the server invoker's transport specific configuration.
       */
  -   public ThreadPool getOnewayThreadPool()
  +   public Map getConfiguration()
      {
  -      if(onewayThreadPool == null)
  +      return configuration;
  +   }
  +
  +   public void removeCallbackListener(String subsystem, InvokerCallbackHandler callbackHandler)
         {
  -         // if no thread pool class set, then use default BasicThreadPool
  -         if(onewayThreadPoolClass == null || onewayThreadPoolClass.length() == 0)
  +      ServerInvocationHandler handler = null;
  +      if(subsystem != null)
            {
  -            BasicThreadPool pool = new BasicThreadPool("JBossRemoting Client Oneway");
  -            pool.setMaximumPoolSize(maxNumberThreads);
  -            pool.setBlockingMode(BlockingMode.WAIT);
  -            onewayThreadPool = pool;
  +         handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
            }
            else
            {
  -            //first check to see if this is an ObjectName
  -            boolean isObjName = false;
  -            try
  +         // subsystem not specified, so will hope for a default one being set
  +         if(!handlers.isEmpty())
               {
  -               ObjectName objName = new ObjectName(onewayThreadPoolClass);
  -               onewayThreadPool = createThreadPoolProxy(objName);
  -               isObjName = true;
  +            handler = (ServerInvocationHandler) handlers.values().iterator().next();
               }
  -            catch(MalformedObjectNameException e)
  -            {
  -               log.debug("Thread pool class supplied is not an object name.");
  +      }
  +      handler.removeListener(callbackHandler);
               }
   
  -            if(!isObjName)
  +   /**
  +    * @return the String for the object name to be used for the invoker.
  +    */
  +   public String getMBeanObjectName()
               {
  -               try
  +      InvokerLocator locator = getLocator();
  +      StringBuffer buffer =
  +         new StringBuffer("jboss.remoting:service=invoker,transport= " + locator.getProtocol());
  +      buffer.append(",host=").append(locator.getHost());
  +      buffer.append(",port=").append(locator.getPort());
  +      Map param = locator.getParameters();
  +      if(param != null)
                  {
  -                  onewayThreadPool = (ThreadPool) Class.forName(onewayThreadPoolClass, false, getClassLoader()).newInstance();
  -               }
  -               catch(Exception e)
  +         Iterator itr = param.keySet().iterator();
  +         while(itr.hasNext())
                  {
  -                  throw new RuntimeException("Error loading instance of ThreadPool based on class name: " + onewayThreadPoolClass);
  +            buffer.append(",");
  +            String key = (String) itr.next();
  +            String value = (String) param.get(key);
  +            buffer.append(key);
  +            buffer.append("=");
  +            buffer.append(value);
                  }
               }
  -
  +      return buffer.toString();
            }
   
  -      }
  -      return onewayThreadPool;
  -   }
  +   // Package protected ----------------------------------------------------------------------------
   
  -   /**
  -    * Sets the oneway thread pool to use.
  -    * @param pool
  -    */
  -   public void setOnewayThreadPool(ThreadPool pool)
  +   // Protected ------------------------------------------------------------------------------------
  +
  +   protected abstract String getDefaultDataType();
  +
  +   protected void setup() throws Exception
      {
  -      this.onewayThreadPool = pool;
  -   }
  +      Map config = getConfiguration();
  +      String maxNumOfThreads = (String)config.get(MAX_NUM_ONEWAY_THREADS_KEY);
   
  -   private ThreadPool createThreadPoolProxy(ObjectName objName)
  +      if(maxNumOfThreads != null && maxNumOfThreads.length() > 0)
      {
  -      ThreadPool pool;
  -      MBeanServer server = getMBeanServer();
  -      if(server != null)
  +         try
         {
  -         ThreadPoolMBean poolMBean = (ThreadPoolMBean)
  -               MBeanServerInvocationHandler.newProxyInstance(server,
  -                                                             objName,
  -                                                             ThreadPoolMBean.class,
  -                                                             false);
  -         pool = poolMBean.getInstance();
  +            maxNumberThreads = Integer.parseInt(maxNumOfThreads);
         }
  -      else
  +         catch(NumberFormatException e)
         {
  -         throw new RuntimeException("Can not register MBean ThreadPool as the ServerInvoker has not been registered with a MBeanServer.");
  +            log.error("Can not convert max number of threads value (" +
  +                       maxNumOfThreads + ") into a number.");
         }
  -      return pool;
      }
   
  +      onewayThreadPoolClass = (String)config.get(ONEWAY_THREAD_POOL_CLASS_KEY);
   
  -   public MBeanServer getMBeanServer()
  +      String locatorHost = locator.getHost();
  +      InetAddress addr = null;
  +      if(locatorHost != null)
      {
  -      return mbeanServer;
  +         addr = InetAddress.getByName(locator.getHost());
      }
  -
  -   public void setMBeanServer(MBeanServer server)
  +      else
      {
  -      /**
  -       * This has been added in order to support mbean service configuration.
  -       * Now supporting classes, such as the ServerInvokerCallbackHandler can find and use
  -       * resources such as CallbackStore, which can be run as a service mbean (and specified
  -       * via object name within config).  The use of JMX throughout remoting is a problem as
  -       * now have to tie it in all throughout the code for service configuration as is being
  -       * done here.  When migrate to use under new server model, which does not depend on JMX,
  -       * can rip out code such as this.
  -       */
  -      this.mbeanServer = server;
  +         addr = InetAddress.getLocalHost();
      }
  -
  -   /**
  -    * return true if a server invocation handler has been registered for this subsystem
  -    *
  -    * @param subsystem
  -    * @return
  -    */
  -   public synchronized boolean hasInvocationHandler(String subsystem)
  +      int port = locator.getPort();
  +      if(port <= 0)
      {
  -      return handlers.containsKey(subsystem);
  +         port = assignPort();
      }
   
  -   /**
  -    * Return array of keys for each subsystem this invoker can handle.
  -    */
  -   public synchronized String[] getSupportedSubsystems()
  +      // set the bind address
  +      serverBindAddress = (String)config.get(SERVER_BIND_ADDRESS_KEY);
  +      clientConnectAddress = (String)config.get(CLIENT_CONNECT_ADDRESS_KEY);
  +      if(serverBindAddress == null)
      {
  -      String subsystems [] = new String[handlers.size()];
  -      return (String[]) handlers.keySet().toArray(subsystems);
  -   }
  -
  -   /**
  -    * Return an array of the server invocation handlers.
  -    */
  -   public synchronized ServerInvocationHandler[] getInvocationHandlers()
  +         if(clientConnectAddress != null)
      {
  -      ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
  -      return (ServerInvocationHandler[]) handlers.values().toArray(ih);
  +            // can't use uri address, as is for client only
  +            serverBindAddress = InetAddress.getLocalHost().getHostAddress();
      }
  -
  -   /**
  -    * Add a server invocation handler for a particular subsystem.  Typically, subsystems are defined
  -    * in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
  -    *
  -    * @return previous ServerInvocationHandler with the same sybsystem value (case insensitive) or
  -    *         null if a previous one did not exist.
  -    */
  -   public synchronized ServerInvocationHandler addInvocationHandler(String subsystem,
  -                                                                    ServerInvocationHandler handler)
  +         else
      {
  -      handler.setInvoker(this);
  -      ServerInvocationHandler oldHandler =
  -         (ServerInvocationHandler)handlers.put(subsystem.toUpperCase(), handler);
  -
  -      log.debug(this + " added " + handler + " for subsystem '" + subsystem + "'" +
  -         (oldHandler == null ? "" : ", replacing old handler " + oldHandler));
  -
  -      return oldHandler;
  +            serverBindAddress = addr.getHostAddress();
      }
  -
  -   /**
  -    * Remove a subsystem invocation handler.
  -    */
  -   public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
  -   {
  -      ServerInvocationHandler handler =
  -         (ServerInvocationHandler)handlers.remove(subsystem.toUpperCase());
  -
  -      log.debug(this + (handler == null ?
  -         " tried to remove handler for " + subsystem + " but no handler found" :
  -         " removed handler " + handler + " for subsystem '" + subsystem + "'"));
  -
  -      return handler;
      }
   
  -   /**
  -    * Get a ServerInvocationHandler for a given subsystem type.
  -    */
  -   public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
  +      // set the bind port
  +      String serverBindPortString = (String)config.get(SERVER_BIND_PORT_KEY);
  +      String clientConnectPortString = (String)config.get(CLIENT_CONNECT_PORT_KEY);
  +      if(clientConnectPortString != null)
      {
  -      return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
  +         try
  +         {
  +            clientConnectPort = Integer.parseInt(clientConnectPortString);
      }
  -
  -   public Object invoke(Object invoke) throws IOException
  +         catch(NumberFormatException e)
      {
  -      InvocationRequest request = null;
  -      InvocationResponse response = null;
  -
  -      if(trace) { log.trace("server received invocation " + invoke); }
  -
  -      if(invoke != null && invoke instanceof InvocationRequest)
  +            throw new InvalidConfigurationException("Can not set client bind port because can " +
  +               "not convert given value (" + clientConnectPortString + ") to a number.");
  +         }
  +      }
  +      if(serverBindPortString != null)
         {
  -         request = (InvocationRequest) invoke;
            try
            {
  -
  -            Object result = invoke(request);
  -
  -            response = new InvocationResponse(request.getSessionId(),
  -                                              result, false, request.getReturnPayload());
  +            serverBindPort = Integer.parseInt(serverBindPortString);
  +            if(serverBindPort <= 0)
  +            {
  +               serverBindPort = assignPort();
  +            }
   
            }
  -         catch(Throwable throwable)
  +         catch(NumberFormatException e)
            {
  -            response = new InvocationResponse(request.getSessionId(),
  -                                              throwable, true, request.getReturnPayload());
  +            throw new InvalidConfigurationException("Can not set server bind port because can " +
  +               "not convert given value (" + serverBindPortString + ") to a number.");
            }
         }
         else
         {
  -         log.error("server invoker received " + invoke + " as invocation.  Must not be null and must be of type InvocationRequest.");
  -         response = new InvocationResponse(request.getSessionId(),
  -                                           new Exception("Error processing invocation request on " + getLocator() + ".  Either invocation was null or of wrong type."),
  -                                           true, request.getReturnPayload());
  +         if(clientConnectPort > 0)
  +         {
  +            // can't use uri port, as is for client only
  +            serverBindPort = PortUtil.findFreePort(locator.getHost());
  +         }
  +         else
  +         {
  +            serverBindPort = port;
         }
  -      return response;
  -
      }
   
  -   /**
  -    * Will get the data type for the marshaller factory so know which marshaller to
  -    * get to marshal the data.  Will first check the locator uri for a 'datatype'
  -    * parameter and take that value if it exists.  Otherwise, will use the
  -    * default datatype for the client invoker, based on transport.
  -    *
  -    * @return
  -    */
  -   public String getDataType()
  -   {
  -      if(dataType == null)
  +      // get timeout config
  +      String timeoutPeriod = (String)config.get(TIMEOUT);
  +      if(timeoutPeriod != null && timeoutPeriod.length() > 0)
         {
  -         dataType = getDataType(getLocator());
  -         if(dataType == null)
  +         try
            {
  -            dataType = getDefaultDataType();
  +            timeout = Integer.parseInt(timeoutPeriod);
            }
  +         catch(NumberFormatException e)
  +         {
  +            throw new InvalidConfigurationException("Can not set timeout because can not " +
  +               "convert give value (" + timeoutPeriod + ") to a number.");
         }
  -      return dataType;
      }
   
  -   //TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
  -   private String getDataType(InvokerLocator locator)
  -   {
  -      String type = null;
  -
  -      if(locator != null)
  +      // config for client lease period
  +      String clientLeasePeriod = (String)config.get(CLIENT_LEASE_PERIOD);
  +      if(clientLeasePeriod != null)
         {
  -         Map params = locator.getParameters();
  -         if(params != null)
  +         try
            {
  -            type = (String) params.get(InvokerLocator.DATATYPE);
  +            long leasePeriodValue = Long.parseLong(clientLeasePeriod);
  +            setLeasePeriod(leasePeriodValue);
            }
  +         catch(NumberFormatException e)
  +         {
  +            throw new InvalidConfigurationException("Can not set client lease period because " +
  +               "can not convert given value (" + clientLeasePeriod + ") to a number.");
         }
  -      return type;
      }
   
  -   protected abstract String getDefaultDataType();
  +      createServerSocketFactory();
  +   }
   
  -   /**
  -    * Processes invocation request depending on the invocation type (internal, name based, oneway, etc).
  -    * Can be called on directly when client and server are local to one another (by-passing serialization)
  -    *
  -    * @param invocation
  -    * @return
  -    * @throws Throwable
  -    */
  -   public Object invoke(InvocationRequest invocation) throws Throwable
  -   {
  -      if (isStarted())
  +   protected int assignPort() throws IOException
         {
  -         Object param = invocation.getParameter();
  -         Object result = null;
  +      int port;
  +      port = PortUtil.findFreePort(locator.getHost());
   
  -         // check to see if this is a is alive ping
  -         if ("$PING$".equals(param))
  -         {
  -            //if checking lease, need to update lease flag
  -            if (leaseManagement)
  -            {
  -               //NOTE we only update the lease when we receive a PING, not for
  -               //all invocations
  -               updateClientLease(invocation);
  -            }
  +      // re-write locator since the port is different
  +      InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), port,
  +                                                     locator.getPath(), locator.getParameters());
   
  -            // if this is an invocation ping, just pong back
  -            Map responseMap = new HashMap();
  -            responseMap.put(CLIENT_LEASE_PERIOD, new Long(leasePeriod));
  -            return new InvocationResponse(invocation.getSessionId(), new Boolean(leaseManagement), false, responseMap);
  +      // need to update the locator key used in the invoker registry
  +      InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
  +      this.locator = newLocator;
  +      return port;
            }
   
  -         if ("$DISCONNECT$".equals(param))
  -         {
  -            if (leaseManagement)
  +   protected ServerSocketFactory createServerSocketFactory() throws IOException
               {
  -               terminateLease(invocation);
  -            }
  -            return null;
  -         }
  -
  -         //TODO: -TME both oneway and internal invocation will be broken since have not
  -         // deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
  -         if (param instanceof OnewayInvocation)
  +      // only want to look at config if server socket factory has not already been set
  +      if(serverSocketFactory == null)
            {
  -            handleOnewayInvocation((OnewayInvocation) param, invocation);
  -         }
  -         else // no point in delaying return to client if oneway
  +         Object obj = configuration.get(Remoting.CUSTOM_SERVER_SOCKET_FACTORY);
  +         if (obj != null)
            {
  -            String subsystem = invocation.getSubsystem();
  -            String clientId = invocation.getSessionId();
  -
  -            // too bad we can't optimize this a little better, since we take a lookup hit for
  -            // each invocation -JGH
  -            ServerInvocationHandler handler = null;
  -            if (subsystem != null)
  +            if (obj instanceof ServerSocketFactory)
               {
  -               handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
  +               serverSocketFactory = (ServerSocketFactory)obj;
               }
               else
               {
  -               // subsystem not specified, so will hope for a default one being set
  -               if (!handlers.isEmpty())
  -               {
  -                  handler = (ServerInvocationHandler) handlers.values().iterator().next();
  +               throw new RuntimeException("Can not set custom server socket factory (" + obj +
  +                                          ") as is not of type javax.net.SocketFactory");
                  }
               }
  -            if (param instanceof InternalInvocation)
  +
  +         if (serverSocketFactory == null)
               {
  -               result = handleInternalInvocation((InternalInvocation) param, invocation, handler);
  +            // TODO: -TME This is another big hack because of dependancy on JMX within configuration
  +            // Have to wait till the mbean server is set before can actually set the server socket
  +            // factory since it is an mbean (new server's DI will fix all this). Would prefer this
  +            // to be in the setup() method...
  +            // Also, I can't cast the mbean proxy directly to ServerSocketFactory because it is not
  +            // an interface. Therefore, have to require that ServerSocketFactoryMBean is used.
   
  -            }
  -            else
  +            String serverSocketFactoryString = (String)configuration.get(SERVER_SOCKET_FACTORY);
  +            if(serverSocketFactoryString != null && serverSocketFactoryString.length() > 0)
               {
  -               if (trace) { log.trace(this + " dispatching " + invocation + " from client " + clientId + " to subsystem '" + subsystem + "'"); }
  +               try
  +               {
  +                  if(serverSocketFactoryString != null)
  +                  {
  +                     MBeanServer server = getMBeanServer();
  +                     ObjectName serverSocketFactoryObjName =
  +                        new ObjectName(serverSocketFactoryString);
   
  -               if (handler == null)
  +                     if(server != null)
                  {
  -                  throw new InvalidConfigurationException("Can not handle invocation request for subsystem("+subsystem+") because there are no " +
  -                                                          "matching ServerInvocationHandlers registered.  Please add via xml configuration " +
  -                                                          "or via the Connector's addInvocationHandler() method.");
  -               }
  -               result = handler.invoke(invocation);
  +                        try
  +                        {
  +                           ServerSocketFactoryMBean serverSocketFactoryMBean =
  +                              (ServerSocketFactoryMBean)MBeanServerInvocationHandler.
  +                                 newProxyInstance(server, serverSocketFactoryObjName,
  +                                                  ServerSocketFactoryMBean.class, false);
  +                           serverSocketFactory =
  +                              new ServerSocketFactoryWrapper(serverSocketFactoryMBean);
               }
  -
  -            if (trace) { log.trace(this + " successfully dispatched invocation, returning " + result + " from subsystem '" + subsystem + "' to client " + clientId); }
  +                        catch(Exception e)
  +                        {
  +                           log.debug("Error creating mbean proxy for server socket factory " +
  +                              "for object name " + serverSocketFactoryObjName + ". " +
  +                              "Will try by class name.");
            }
  -
  -         return result;
         }
         else
         {
  -         log.warn(this + " can not process invocation requests since is not in started state!");
  -         throw new InvalidStateException("Can not process invocation request since is not in started state.");
  +                        log.debug("The 'serverSocketFactory' attribute was set with a value, " +
  +                           "but the MBeanServer reference is null.");
         }
      }
  -
  -   private void terminateLease(InvocationRequest invocation)
  -   {
  -      if (invocation != null)
  +               }
  +               catch(MalformedObjectNameException e)
         {
  -         String clientSessionId = invocation.getSessionId();
  -         Lease clientLease = (Lease) clientLeases.get(clientSessionId);
  -         if (clientLease != null)
  +                  log.debug("Attibute value (" + serverSocketFactoryString + ") passed is not a " +
  +                     "valid ObjectName. Can not look up if is a mbean service. Will try by classname.");
  +               }
  +               catch(NullPointerException e)
            {
  -            boolean clientOnlyTerminated = false;
  -            // now have to determine if is just Client that disconnected
  -            // or if all Clients disconnected, thus the client invoker
  -            // is also disconnected as well.
  -            Map reqMap = invocation.getRequestPayload();
  -            if (reqMap != null)
  +                  log.debug("Could not set up the server socket factory as a mbean service " +
  +                     "due to null pointer exception.");
  +               }
  +
  +               // couldn't create from object name for mbean service, will try by class name
  +               if(serverSocketFactory == null)
               {
  -               Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
  -               if (holderObj != null && holderObj instanceof ClientHolder)
  +                  try
                  {
  -                  // just a client that disconnected, so only need to terminate lease for
  -                  // that particular client (by client session id).
  -                  ClientHolder holder = (ClientHolder) holderObj;
  -                  clientLease.terminateLease(holder.getSessionId());
  -                  clientOnlyTerminated = true;
  -               }
  -            }
  +                     Class cl = ClassLoaderUtility.loadClass(serverSocketFactoryString, getClass());
   
  -            // now see if client invoker needs to be terminated
  -            if (!clientOnlyTerminated)
  +                     Constructor serverSocketConstructor = null;
  +                     serverSocketConstructor = cl.getConstructor(new Class[]{});
  +                     serverSocketFactory =
  +                        (ServerSocketFactory)serverSocketConstructor.newInstance(new Object[] {});
  +                     log.trace("ServerSocketFactory (" + serverSocketFactoryString + ") loaded");
  +                  }
  +                  catch(Exception e)
               {
  -               clientLease.terminateLease(clientSessionId);
  -               clientLeases.remove(clientSessionId);
  +                     log.debug("Could not create server socket factory by classname (" +
  +                        serverSocketFactoryString + ").  Error message: " + e.getMessage());
               }
            }
  -         else
  -         {
  -            log.warn("Asked to terminate lease for client session id " + clientSessionId + ", but lease for this id could not be found.");
            }
         }
      }
   
  -   private void updateClientLease(InvocationRequest invocation)
  -   {
  -      if(invocation != null)
  +      if (serverSocketFactory == null && needsCustomSSLConfiguration(configuration))
         {
  -         String clientSessionId = invocation.getSessionId();
  -         if(clientSessionId != null)
  +         try
            {
  -            if(trace)
  +            SSLSocketBuilder socketBuilder = new SSLSocketBuilder(configuration);
  +            socketBuilder.setUseSSLServerSocketFactory(false);
  +            serverSocketFactory = socketBuilder.createSSLServerSocketFactory();
  +         }
  +         catch (IOException e)
               {
  -               log.trace("Getting lease for client session id: " + clientSessionId);
  +            throw new RuntimeException("Unable to create customized SSL socket factory", e);
  +         }
               }
   
  -            Lease clientLease = (Lease)clientLeases.get(clientSessionId);
  -            if(clientLease == null)
  +      if(serverSocketFactory == null)
               {
  -               Lease newClientLease = new Lease(clientSessionId, leasePeriod,
  -                                                locator.getLocatorURI(), invocation.getRequestPayload(),
  -                                                connectionNotifier,
  -                                                clientLeases);
  -               clientLeases.put(clientSessionId, newClientLease);
  -               newClientLease.startLease();
  +         log.debug(this + " did not find server socket factory configuration as mbean service " +
  +            "or classname. Creating default server socket factory.");
   
  -               if(trace)
  -               {
  -                  log.trace("No lease established for client session id (" + clientSessionId + "), so starting a new one.");
  +         serverSocketFactory = getDefaultServerSocketFactory();
                  }
  +
  +      log.debug(this + " created server socket factory " + serverSocketFactory);
  +
  +      serverSocketFactory = wrapServerSocketFactory(serverSocketFactory, configuration);
  +      return serverSocketFactory;
  +
               }
  -            else
  -            {
  -               // including request payload from invocation as may contain updated list of clients.
  -               clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
   
  -               if(trace)
  +   protected boolean justNeedsSSLClientMode(Map configuration)
                  {
  -                  log.trace("Updated lease for client session id (" + clientSessionId + ")");
  -               }
  +      if (configuration.size() == 1 &&
  +         configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE))
  +      {
  +         String useClientModeString =
  +            (String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
  +         return Boolean.valueOf(useClientModeString).booleanValue();
               }
  +
  +      if (configuration.size() == 1 &&
  +         configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
  +      {
  +         String useClientModeString =
  +            (String)configuration.get(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE);
  +         return Boolean.valueOf(useClientModeString).booleanValue();
            }
  +
  +      if (configuration.size() == 2
  +            && configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE)
  +            && configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
  +      {
  +         String useClientModeString =
  +            (String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
  +         return Boolean.valueOf(useClientModeString).booleanValue();
         }
  +
  +      return false;
      }
   
      /**
  -    * Takes the real invocation from the client out of the OnewayInvocation and then executes the invoke()
  -    * with the real invocation on a seperate thread.
  -    *
  -    * @param onewayInvocation
  -    * @param invocation
  -    * @throws Throwable
  +    * Gets the default server socket factory to use for the server invoker. The intention is this
  +    * method will be overridden by sub-classes for their specific defaults.
       */
  -   private void handleOnewayInvocation(OnewayInvocation onewayInvocation, InvocationRequest invocation) throws Throwable
  +   protected ServerSocketFactory getDefaultServerSocketFactory() throws IOException
      {
  -      Object[] objs = onewayInvocation.getParameters();
  -      // The oneway invocation should contain the real param as it's only param in parameter array
  -      Object realParam = objs[0];
  -      invocation.setParameter(realParam);
  -      final InvocationRequest newInvocation = invocation;
  +      return ServerSocketFactory.getDefault();
  +   }
   
  -      ThreadPool executor = getOnewayThreadPool();
  -      Runnable onewayRun = new Runnable()
  +   protected ServerSocketFactory wrapServerSocketFactory(ServerSocketFactory ssf, Map config)
         {
  -         public void run()
  +      if (config == null)
  +      {
  +         return ssf;
  +      }
  +
  +      Object o = config.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
  +
  +      if (o == null)
  +      {
  +         return ssf;
  +      }
  +
  +      if (o instanceof SocketCreationListener)
  +      {
  +         return new CreationListenerServerSocketFactory(ssf, (SocketCreationListener) o);
  +      }
  +      else if (o instanceof String)
            {
               try
               {
  -               invoke(newInvocation);
  +            Class c = ClassLoaderUtility.loadClass((String) o, ServerInvoker.class);
  +            SocketCreationListener listener = (SocketCreationListener)c.newInstance();
  +            return new CreationListenerServerSocketFactory(ssf, listener);
               }
  -            catch(Throwable e)
  +         catch (Exception e)
               {
  -               // throw away exception since can't get it back to original caller
  -               log.error("Error executing server oneway invocation request: " + newInvocation, e);
  +            log.error("unable to instantiate class: " + o, e);
  +            return ssf;
               }
            }
  -      };
  -      executor.run(onewayRun);
  +      else
  +      {
  +         log.error("unrecognized type for socket creation server listener: " + o);
  +         return ssf;
  +      }
      }
   
      /**
  -    * Handles both internal and external invocations (internal meaning only
  -    * to be used within remoting and external for ones that go to handlers.
  -    *
  -    * @param param
  -    * @param invocation
  -    * @param handler
  -    * @return
  -    * @throws Throwable
  +    * Handles both internal and external invocations (internal meaning only to be used within
  +    * remoting and external for ones that go to handlers.
       */
      protected Object handleInternalInvocation(InternalInvocation param,
                                              InvocationRequest invocation,
  -                                           ServerInvocationHandler handler)
  -         throws Throwable
  +                                             ServerInvocationHandler handler) throws Throwable
      {
         Object result = null;
         String methodName = param.getMethodName();
  -      if(trace)
  -      {
  -         log.trace("handling InternalInvocation where method name = " + methodName);
  -      }
  +
  +      if(trace) { log.trace("handling InternalInvocation where method name = " + methodName); }
  +
         // check if the invocation is for callback handling
         if(InternalInvocation.ADDLISTENER.equals(methodName))
         {
            if(handler == null)
            {
  -            throw new InvalidConfigurationException("Can not accept a callback listener since there are no " +
  -                                                    "ServerInvocationHandlers registered.  Please add via xml configuration " +
  -                                                    "or via the Connector's addInvocationHandler() method.");
  +            throw new InvalidConfigurationException(
  +               "Can not accept a callback listener since there are no ServerInvocationHandlers " +
  +               "registered. Please add via xml configuration or via the Connector's " +
  +               "addInvocationHandler() method.");
   
            }
            InvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
  @@ -1213,85 +1211,87 @@
            {
               if(handler == null)
               {
  -               throw new InvalidConfigurationException("Can not remove a callback listener since there are no " +
  -                                                       "ServerInvocationHandlers registered.  Please add via xml configuration " +
  -                                                       "or via the Connector's addInvocationHandler() method.");
  +               throw new InvalidConfigurationException(
  +                  "Can not remove a callback listener since there are no ServerInvocationHandlers " +
  +                  "registered.  Please add via xml configuration or via the Connector's " +
  +                  "addInvocationHandler() method.");
               }
               handler.removeListener(callbackHandler);
  -            if(trace)
  -            {
  -               log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + ".");
  -            }
  +
  +            if(trace) { log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + "."); }
   
               callbackHandler.destroy();
            }
            else
            {
               String sessionId = ServerInvokerCallbackHandler.getId(invocation);
  -            throw new RuntimeException("Can not remove callback listener from target server with id of " + sessionId + " as it does not exist as a registered callback listener.");
  +            throw new RuntimeException("Can not remove callback listener from target server with " +
  +               "id of " + sessionId + " as it does not exist as a registered callback listener.");
            }
         }
         else if(InternalInvocation.GETCALLBACKS.equals(methodName))
         {
            ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
  -         if(trace)
  -         {
  -            log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + ".");
  -         }
  -
  +         if(trace) { log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + "."); }
            result = callbackHandler.getCallbacks();
         }
         else if(InternalInvocation.ACKNOWLEDGECALLBACK.equals(methodName))
         {
            ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
  -         if(trace)
  -         {
  -            log.trace("ServerInvoker (" + this + ") acknowledge callback on callback handler " + callbackHandler + ".");
  -         }
  -
  +         if(trace) { log.trace("ServerInvoker (" + this + ") acknowledge callback on callback handler " + callbackHandler + "."); }
            callbackHandler.acknowledgeCallbacks(param);
         }
         else if(InternalInvocation.ADDCLIENTLISTENER.equals(methodName))
         {
            String sessionId = ServerInvokerCallbackHandler.getId(invocation);
            Object[] params = param.getParameters();
  +
            // the only elements should be the callback handler and possibly the callback handle object
            if(params == null || params.length < 0 || params.length > 3)
            {
               log.error("Recieved addClientListener InternalInvocation, but getParameters() " +
                         "returned: " + params);
  -            throw new RuntimeException("InvokerCallbackHandler and callback handle object (optional) must be supplied as the only " +
  -                                       "parameter objects within the InternalInvocation when " +
  -                                       "calling addClientListener.");
  +            throw new RuntimeException(
  +               "InvokerCallbackHandler and callback handle object (optional) must be supplied as " +
  +               "the only parameter objects within the InternalInvocation when calling " +
  +               "addClientListener.");
            }
  -         InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler) params[0];
  +
  +         InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler)params[0];
            Object callbackHandleObject = params[1];
  -         CallbackContainer callbackContainer = new CallbackContainer(callbackHandler, callbackHandleObject);
  +         CallbackContainer callbackContainer =
  +            new CallbackContainer(callbackHandler, callbackHandleObject);
            clientCallbackListener.put(sessionId, callbackContainer);
   
  -         log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler + " with session id of " + sessionId +
  -                   " and callback handle object of " + callbackHandleObject + ".");
  +         log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler +
  +            " with session id of " + sessionId + " and callback handle object of " +
  +            callbackHandleObject + ".");
   
         }
         else if(InternalInvocation.REMOVECLIENTLISTENER.equals(methodName))
         {
            String sessionId = ServerInvokerCallbackHandler.getId(invocation);
  -         log.debug("ServerInvoker (" + this + ") removing client callback handler with session id of " + sessionId + ".");
  +
  +         log.debug("ServerInvoker (" + this + ") removing client callback handler with session " +
  +            "id of " + sessionId + ".");
  +
            Object cbo = clientCallbackListener.remove(sessionId);
            if(cbo == null)
            {
  -            throw new RuntimeException("Can not remove callback listener from callback server with id of " + sessionId + " as it does not exist as a registered callback listener.");
  +            throw new RuntimeException(
  +               "Can not remove callback listener from callback server with id of " + sessionId +
  +               " as it does not exist as a registered callback listener.");
            }
   
         }
         else if(InternalInvocation.HANDLECALLBACK.equals(methodName))
         {
            String sessionId = ServerInvokerCallbackHandler.getId(invocation);
  -         if(trace)
  -         {
  -            log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + ".");
  -         }
  -         CallbackContainer callbackContainer = (CallbackContainer) clientCallbackListener.get(sessionId);
  +         if(trace) { log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + "."); }
  +
  +         CallbackContainer callbackContainer =
  +            (CallbackContainer) clientCallbackListener.get(sessionId);
  +
            if(callbackContainer != null && callbackContainer.getCallbackHandler() != null)
            {
               Object[] params = param.getParameters();
  @@ -1301,7 +1301,8 @@
               {
                  callbackHandleObject = new HashMap();
               }
  -            callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY, callbackContainer.getCallbackHandleObject());
  +            callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY,
  +                                     callbackContainer.getCallbackHandleObject());
               callbackRequest.setReturnPayload(callbackHandleObject);
               InvokerCallbackHandler callbackHandler = callbackContainer.getCallbackHandler();
               callbackHandler.handleCallback(callbackRequest);
  @@ -1311,227 +1312,244 @@
               log.error("Could not find callback handler to call upon for handleCallback " +
                         "where session id equals " + sessionId);
            }
  -
  -
         }
         else if(InternalInvocation.ADDSTREAMCALLBACK.equals(methodName))
         {
            StreamHandler streamHandler = getStreamHandler(invocation);
            if(handler instanceof StreamInvocationHandler)
            {
  -            InternalInvocation inv = (InternalInvocation) invocation.getParameter();
  +            InternalInvocation inv = (InternalInvocation)invocation.getParameter();
               // second parameter should be the param payload
  -            result = ((StreamInvocationHandler) handler).handleStream(streamHandler, (InvocationRequest)inv.getParameters()[1]);
  +            result = ((StreamInvocationHandler)handler).
  +               handleStream(streamHandler, (InvocationRequest)inv.getParameters()[1]);
            }
            else
            {
               log.error("Client request is an InputStream, but the registered handlers do not " +
                         "implement the StreamInvocationHandler interface, so could not process call.");
  -            throw new RuntimeException("No handler registered of proper type (StreamInvocationHandler).");
  +            throw new RuntimeException(
  +               "No handler registered of proper type (StreamInvocationHandler).");
            }
         }
         else
         {
            log.error("Error processing InternalInvocation.  Unable to process method " +
                      methodName + ".  Please make sure this should be an InternalInvocation.");
  -         throw new RuntimeException("Error processing InternalInvocation.  Unable to process method " +
  -                                    methodName);
  +         throw new RuntimeException(
  +            "Error processing InternalInvocation. Unable to process method " + methodName);
         }
         return result;
      }
   
  -   private StreamHandler getStreamHandler(InvocationRequest invocation) throws Exception
  +   /**
  +    * Called prior to an invocation.
  +    * TODO is sending in the arg appropriate?
  +    */
  +   protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
      {
  -      InternalInvocation inv = (InternalInvocation) invocation.getParameter();
  -      String locator = (String) inv.getParameters()[0];
  -      StreamHandler streamHandler = new StreamHandler(locator);
  -      //StreamHandler streamHandler = StreamHandler.createStreamHandler(locator);
  -      return streamHandler;
      }
   
  -   private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation) throws Exception
  +   /**
  +    * Called after an invocation.
  +    * TODO is sending in the arg appropriate?
  +    */
  +   protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
      {
  -      ServerInvokerCallbackHandler callbackHandler = null;
  -      String id = ServerInvokerCallbackHandler.getId(invocation);
  -      synchronized(callbackHandlers)
  +   }
  +
  +   // Private --------------------------------------------------------------------------------------
  +
  +   private ThreadPool createThreadPoolProxy(ObjectName objName)
         {
  -         callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.get(id);
  -         // if does not exist, create it
  -         if(callbackHandler == null)
  +      ThreadPool pool;
  +      MBeanServer server = getMBeanServer();
  +      if(server != null)
            {
  -            callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
  -            callbackHandlers.put(id, callbackHandler);
  -         }
  +         ThreadPoolMBean poolMBean = (ThreadPoolMBean)MBeanServerInvocationHandler.
  +            newProxyInstance(server, objName, ThreadPoolMBean.class, false);
  +
  +         pool = poolMBean.getInstance();
         }
  -      if(trace)
  +      else
         {
  -         log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + ".");
  +         throw new RuntimeException("Can not register MBean ThreadPool as the ServerInvoker " +
  +            "has not been registered with a MBeanServer.");
         }
  -
  -      return callbackHandler;
  +      return pool;
      }
   
  -   private ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
  +   //TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
  +   private String getDataType(InvokerLocator locator)
      {
  -      String id = ServerInvokerCallbackHandler.getId(invocation);
  -      ServerInvokerCallbackHandler callbackHandler = null;
  +      String type = null;
   
  -      synchronized(callbackHandlers)
  +      if(locator != null)
         {
  -         callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
  +         Map params = locator.getParameters();
  +         if(params != null)
  +         {
  +            type = (String) params.get(InvokerLocator.DATATYPE);
         }
  -      return callbackHandler;
      }
  -
  -
  -   /**
  -    * called prior to an invocation
  -    *
  -    * @param sessionId
  -    * @param payload
  -    * @param locator
  -    * @todo is sending in the arg appropriate???
  -    */
  -   protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
  -   {
  +      return type;
      }
   
  -   /**
  -    * called after an invocation
  -    *
  -    * @param sessionId
  -    * @param payload
  -    * @param locator
  -    * @todo is sending in the arg appropriate???
  -    */
  -   protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
  +   private void terminateLease(InvocationRequest invocation)
      {
  -   }
  -
  -   public void create()
  +      if (invocation != null)
      {
  -      if(!created)
  +         String clientSessionId = invocation.getSessionId();
  +         Lease clientLease = (Lease)clientLeases.get(clientSessionId);
  +
  +         if (clientLease != null)
         {
  -         try
  +            boolean clientOnlyTerminated = false;
  +            // now have to determine if is just Client that disconnected
  +            // or if all Clients disconnected, thus the client invoker
  +            // is also disconnected as well.
  +            Map reqMap = invocation.getRequestPayload();
  +            if (reqMap != null)
            {
  -            setup();
  -         }
  -         catch(Exception e)
  +               Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
  +               if (holderObj != null && holderObj instanceof ClientHolder)
            {
  -            throw new RuntimeException("Error setting up server invoker " + this, e);
  -         }
  -         created = true;
  +                  // just a client that disconnected, so only need to terminate lease for
  +                  // that particular client (by client session id).
  +                  ClientHolder holder = (ClientHolder) holderObj;
  +                  clientLease.terminateLease(holder.getSessionId());
  +                  clientOnlyTerminated = true;
         }
      }
   
  -   /**
  -    * subclasses should override to provide any specific start logic
  -    *
  -    * @throws IOException
  -    */
  -   public void start() throws IOException
  +            // now see if client invoker needs to be terminated
  +            if (!clientOnlyTerminated)
      {
  -      started = true;
  -      log.debug(this + " started for locator " + getLocator());
  +               clientLease.terminateLease(clientSessionId);
  +               clientLeases.remove(clientSessionId);
      }
  -
  -   /**
  -    * return true if the server invoker is started, false if not
  -    */
  -   public boolean isStarted()
  +         }
  +         else
      {
  -      return started;
  +            log.warn("Asked to terminate lease for client session id " + clientSessionId +
  +               ", but lease for this id could not be found.");
  +         }
  +      }
      }
   
  -   /**
  -    * subclasses should override to provide any specific stop logic
  -    */
  -   public void stop()
  +   private void updateClientLease(InvocationRequest invocation)
      {
  -      started = false;
  +      if(invocation != null)
  +      {
  +         String clientSessionId = invocation.getSessionId();
  +         if(clientSessionId != null)
  +         {
  +            if(trace) { log.trace("Getting lease for client session id: " + clientSessionId); }
   
  -      Iterator it = callbackHandlers.values().iterator();
  -      while (it.hasNext())
  +            Lease clientLease = (Lease)clientLeases.get(clientSessionId);
  +            if(clientLease == null)
         {
  -         ServerInvokerCallbackHandler callbackHandler = (ServerInvokerCallbackHandler) it.next();
  -         callbackHandler.destroy();
  +               Lease newClientLease = new Lease(clientSessionId, leasePeriod,
  +                                                locator.getLocatorURI(),
  +                                                invocation.getRequestPayload(),
  +                                                connectionNotifier,
  +                                                clientLeases);
  +
  +               clientLeases.put(clientSessionId, newClientLease);
  +               newClientLease.startLease();
  +
  +               if(trace) { log.trace("No lease established for client session id (" + clientSessionId + "), so starting a new one."); }
         }
  +            else
  +            {
  +               // including request payload from invocation as may contain updated list of clients.
  +               clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
   
  -      log.debug(this + " stopped");
  +               if(trace) { log.trace("Updated lease for client session id (" + clientSessionId + ")"); }
  +            }
  +         }
  +      }
      }
   
      /**
  -    * destory the invoker permanently
  +    * Takes the real invocation from the client out of the OnewayInvocation and then executes the
  +    * invoke() with the real invocation on a seperate thread.
       */
  -   public void destroy()
  +   private void handleOnewayInvocation(OnewayInvocation onewayInvocation,
  +                                       InvocationRequest invocation) throws Throwable
      {
  -      if(classbyteloader != null)
  +      Object[] objs = onewayInvocation.getParameters();
  +
  +      // The oneway invocation should contain the real param as it's only param in parameter array
  +      Object realParam = objs[0];
  +      invocation.setParameter(realParam);
  +      final InvocationRequest newInvocation = invocation;
  +
  +      ThreadPool executor = getOnewayThreadPool();
  +      Runnable onewayRun = new Runnable()
         {
  -         classbyteloader.destroy();
  -      }
  +         public void run()
  +         {
  +            try
  +            {
  +               invoke(newInvocation);
      }
  -
  -   /**
  -    * Sets the server invoker's transport specific configuration.  Will need to set before calling
  -    * start() method (or at least stop() and start() again) before configurations will take affect.
  -    *
  -    * @param configuration
  -    */
  -   public void setConfiguration(Map configuration)
  +            catch(Throwable e)
      {
  -      this.configuration = configuration;
  +               // throw away exception since can't get it back to original caller
  +               log.error("Error executing server oneway invocation request: " + newInvocation, e);
  +            }
  +         }
  +      };
  +      executor.run(onewayRun);
      }
   
  -   /**
  -    * Gets the server invoker's transport specific configuration.
  -    */
  -   public Map getConfiguration()
  +   private StreamHandler getStreamHandler(InvocationRequest invocation) throws Exception
      {
  -      return configuration;
  +      InternalInvocation inv = (InternalInvocation)invocation.getParameter();
  +      String locator = (String)inv.getParameters()[0];
  +      return new StreamHandler(locator);
      }
   
  -   /**
  -    * Returns the String for the object name to be used for the invoker.
  -    */
  -   public String getMBeanObjectName()
  +   private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation)
  +      throws Exception
      {
  -      InvokerLocator locator = getLocator();
  -      StringBuffer buffer = new StringBuffer("jboss.remoting:service=invoker,transport= " + locator.getProtocol());
  -      buffer.append(",host=" + locator.getHost());
  -      buffer.append(",port=" + locator.getPort());
  -      Map param = locator.getParameters();
  -      if(param != null)
  +      ServerInvokerCallbackHandler callbackHandler = null;
  +      String id = ServerInvokerCallbackHandler.getId(invocation);
  +      synchronized(callbackHandlers)
         {
  -         Iterator itr = param.keySet().iterator();
  -         while(itr.hasNext())
  +         callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.get(id);
  +         // if does not exist, create it
  +         if(callbackHandler == null)
            {
  -            buffer.append(",");
  -            String key = (String) itr.next();
  -            String value = (String) param.get(key);
  -            buffer.append(key);
  -            buffer.append("=");
  -            buffer.append(value);
  +            callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
  +            callbackHandlers.put(id, callbackHandler);
            }
         }
  -      return buffer.toString();
  +
  +      if(trace) { log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + "."); }
  +      return callbackHandler;
      }
   
  -   public void removeCallbackListener(String subsystem, InvokerCallbackHandler callbackHandler)
  +   private ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
      {
  -      ServerInvocationHandler handler = null;
  -      if(subsystem != null)
  +      String id = ServerInvokerCallbackHandler.getId(invocation);
  +      ServerInvokerCallbackHandler callbackHandler = null;
  +
  +      synchronized(callbackHandlers)
         {
  -         handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
  +         callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
         }
  -      else
  +      return callbackHandler;
  +   }
  +
  +   // Inner classes --------------------------------------------------------------------------------
  +
  +   public static class InvalidStateException extends Exception
         {
  -         // subsystem not specified, so will hope for a default one being set
  -         if(!handlers.isEmpty())
  +      public InvalidStateException(String msg)
            {
  -            handler = (ServerInvocationHandler) handlers.values().iterator().next();
  -         }
  +         super(msg);
         }
  -      handler.removeListener(callbackHandler);
      }
   
      private class CallbackContainer
  @@ -1556,12 +1574,4 @@
         }
      }
   
  -   public static class InvalidStateException extends Exception
  -   {
  -      public InvalidStateException(String msg)
  -      {
  -         super(msg);
  -      }
  -
  -   }
   }
  
  
  



More information about the jboss-cvs-commits mailing list