[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting ...
Ovidiu Feodorov
ovidiu.feodorov at jboss.com
Tue Jan 23 00:08:21 EST 2007
User: ovidiu
Date: 07/01/23 00:08:21
Modified: src/main/org/jboss/remoting Tag: remoting_2_x
MicroRemoteClientInvoker.java ServerInvoker.java
Log:
more reformatting and minor refactoring
Revision Changes Path
No revision
No revision
1.7.2.8 +2 -2 JBossRemoting/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MicroRemoteClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java,v
retrieving revision 1.7.2.7
retrieving revision 1.7.2.8
diff -u -b -r1.7.2.7 -r1.7.2.8
--- MicroRemoteClientInvoker.java 21 Jan 2007 08:38:32 -0000 1.7.2.7
+++ MicroRemoteClientInvoker.java 23 Jan 2007 05:08:21 -0000 1.7.2.8
@@ -27,7 +27,7 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.7.2.7 $
+ * @version $Revision: 1.7.2.8 $
*/
public abstract class MicroRemoteClientInvoker extends AbstractInvoker implements ClientInvoker
{
@@ -121,7 +121,7 @@
// Now check if is remoting response and process
if (returnValue instanceof InvocationResponse)
{
- InvocationResponse response = (InvocationResponse) returnValue;
+ InvocationResponse response = (InvocationResponse)returnValue;
returnValue = response.getResult();
// if is a server side exception, throw it
1.52.2.21 +868 -858 JBossRemoting/src/main/org/jboss/remoting/ServerInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/ServerInvoker.java,v
retrieving revision 1.52.2.20
retrieving revision 1.52.2.21
diff -u -b -r1.52.2.20 -r1.52.2.21
--- ServerInvoker.java 21 Jan 2007 08:38:32 -0000 1.52.2.20
+++ ServerInvoker.java 23 Jan 2007 05:08:21 -0000 1.52.2.21
@@ -65,17 +65,17 @@
* @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.52.2.20 $
+ * @version $Revision: 1.52.2.21 $
*/
public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
{
- private static final Logger log = Logger.getLogger(ServerInvoker.class);
+ // Constants ------------------------------------------------------------------------------------
- private static boolean trace = log.isTraceEnabled();
+ private static final Logger log = Logger.getLogger(ServerInvoker.class);
/**
- * Key for the the maximum number of thread to be used in the thread pool
- * for one way invocations (server side).
+ * Key for the the maximum number of threads to be used in the thread pool for one way
+ * invocations (server side).
* This property is only used when the default oneway thread pool is used.
*/
public static final String MAX_NUM_ONEWAY_THREADS_KEY = "maxNumThreadsOneway";
@@ -84,15 +84,17 @@
* Key for setting the setting the oneway thread pool to use.
* The value used with this key will first be checked to see if is a JMX ObjectName and if so,
* try to look up associated mbean for the ObjectName given and cast to type
- * org.jboss.util.threadpool.ThreadPoolMBean (via MBeanServerInvocationHandler.newProxyInstance()).
- * If the value is not a JMX ObjectName, will assume is a fully qualified classname and load the
- * coresponding class and create a new instance of it (which will require it to have a void constructor).
- * The newly created instance will then be cast to type of org.jboss.util.threadpool.ThreadPool.
+ * org.jboss.util.threadpool.ThreadPoolMBean
+ * (via MBeanServerInvocationHandler.newProxyInstance()). If the value is not a JMX ObjectName,
+ * will assume is a fully qualified classname and load the coresponding class and create a new
+ * instance of it (which will require it to have a void constructor). The newly created instance
+ * will then be cast to type of org.jboss.util.threadpool.ThreadPool.
*/
public static final String ONEWAY_THREAD_POOL_CLASS_KEY = "onewayThreadPool";
/**
- * Key for setting the address the server invoker should bind to. The value can be either host name or IP.
+ * Key for setting the address the server invoker should bind to.
+ * The value can be either host name or IP.
*/
public static final String SERVER_BIND_ADDRESS_KEY = "serverBindAddress";
@@ -100,31 +102,34 @@
* Key for setting the addres the client invoker should connecto to.
* This should be used when client will be connecting to server from outside the server's network
* and the external address is different from that of the internal address the server invoker
- * will bind to (e.g. using NAT to expose different external address). This will mostly be
- * useful when client uses remoting detection to discover remoting servers.
- * The value can be either host name or IP.
+ * will bind to (e.g. using NAT to expose different external address). This will mostly be useful
+ * when client uses remoting detection to discover remoting servers. The value can be either host
+ * name or IP.
*/
public static final String CLIENT_CONNECT_ADDRESS_KEY = "clientConnectAddress";
/**
* Key for setting the port the server invoker should bind to.
- * If the value supplied is less than or equal to zero, the server invoker will randomly choose a free port to use.
+ * If the value supplied is less than or equal to zero, the server invoker will randomly choose
+ * a free port to use.
*/
public static final String SERVER_BIND_PORT_KEY = "serverBindPort";
/**
* key for setting the port the client invoker should connect to.
- * This should be used when client will be connecting to server from outside the server's
- * network and the external port is different from that of the internal port the server
- * invoker will bind to (e.g. using NAT to expose different port routing).
- * This will be mostly useful when client uses remoting detection to discover remoting servers.
+ * This should be used when client will be connecting to server from outside the server's network
+ * and the external port is different from that of the internal port the server invoker will bind
+ * to (e.g. using NAT to expose different port routing). This will be mostly useful when client
+ * uses remoting detection to discover remoting servers.
*/
public static final String CLIENT_CONNECT_PORT_KEY = "clientConnectPort";
/**
- * Key used for setting the amount of time (in milliseconds) that a client should renew its lease.
- * If this value is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used.
- * This value will also be what is given to the client when it initially querys server for leasing information.
+ * Key used for setting the amount of time (in milliseconds) that a client should renew its
+ * lease.
+ * If this value is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will
+ * be used. This value will also be what is given to the client when it initially querys server
+ * for leasing information.
*/
public static final String CLIENT_LEASE_PERIOD = "clientLeasePeriod";
@@ -139,52 +144,50 @@
* a proxy to it with type of org.jboss.remoting.security.ServerSocketFactoryMBean
* (via MBeanServerInvocationHandler.newProxyInstance()). If not a JMX ObjectName, will assume
* is the fully qualified classname to the implementation to be used and will load the class,
- * create a new instance of it (which requires it to have a void constructor).
- * The instance will then be cast to type javax.net.ServerSocketFactory.
+ * create a new instance of it (which requires it to have a void constructor). The instance will
+ * then be cast to type javax.net.ServerSocketFactory.
*/
public static final String SERVER_SOCKET_FACTORY = "serverSocketFactory";
/**
- * The max number of worker threads to be used in the
- * pool for processing one way calls on the server side.
- * Value is is 100.
+ * The max number of worker threads to be used in the pool for processing one way calls on the
+ * server side. Value is is 100.
*/
public static final int MAX_NUM_ONEWAY_THREADS = 100;
/**
- * The default lease period for clients. This is the number of
- * milliseconds that a client will be required to renew their lease
- * with the server. The default value is 5 seconds.
+ * The default lease period for clients. This is the number of milliseconds that a client will be
+ * required to renew their lease with the server. The default value is 5 seconds.
*/
public static final int DEFAULT_CLIENT_LEASE_PERIOD = 5000;
/**
- * The default timeout period for socket connections.
- * The default value is 60000 milliseconds.
+ * The default timeout period for socket connections. The default value is 60000 milliseconds.
*/
public static final int DEFAULT_TIMEOUT_PERIOD = 60000;
+
+
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ // Attributes -----------------------------------------------------------------------------------
+
/**
* Indicated the max number of threads used within oneway thread pool.
*/
private int maxNumberThreads = MAX_NUM_ONEWAY_THREADS;
private String onewayThreadPoolClass = null;
private ThreadPool onewayThreadPool;
-
- protected Map handlers = new HashMap();
- protected Map callbackHandlers = new HashMap();
- protected Map clientCallbackListener = new HashMap();
- protected boolean started = false;
private boolean created = false;
private MBeanServer mbeanServer = null;
private String dataType;
-
private String serverBindAddress = null;
private int serverBindPort = 0;
private String clientConnectAddress = null;
private int clientConnectPort = -1;
-
private int timeout = DEFAULT_TIMEOUT_PERIOD;
// indicates the lease timeout period for clients
@@ -192,10 +195,16 @@
private boolean leaseManagement = false;
private Map clientLeases = new HashMap();
- protected ConnectionNotifier connectionNotifier = new ConnectionNotifier();
+ protected Map handlers = new HashMap();
+ protected Map callbackHandlers = new HashMap();
+ protected Map clientCallbackListener = new HashMap();
+ protected boolean started = false;
+ protected ConnectionNotifier connectionNotifier = new ConnectionNotifier();
protected ServerSocketFactory serverSocketFactory = null;
+ // Constructors ---------------------------------------------------------------------------------
+
public ServerInvoker(InvokerLocator locator)
{
super(locator);
@@ -211,7 +220,9 @@
super(locator, configuration);
if (configuration != null)
+ {
this.configuration.putAll(configuration);
+ }
Map locatorParams = locator.getParameters();
if(locatorParams != null)
@@ -220,987 +231,974 @@
}
}
- protected void setup() throws Exception
- {
- Map config = getConfiguration();
- String maxNumOfThreads = (String) config.get(MAX_NUM_ONEWAY_THREADS_KEY);
- if(maxNumOfThreads != null && maxNumOfThreads.length() > 0)
- {
- try
+ // Public ---------------------------------------------------------------------------------------
+
+ public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
{
- maxNumberThreads = Integer.parseInt(maxNumOfThreads);
+ this.serverSocketFactory = serverSocketFactory;
}
- catch(NumberFormatException e)
+
+ public ServerSocketFactory getServerSocketFactory()
{
- log.error("Can not convert max number of threads value (" + maxNumOfThreads + ") into a number.");
- }
+ return serverSocketFactory;
}
- onewayThreadPoolClass = (String) config.get(ONEWAY_THREAD_POOL_CLASS_KEY);
- String locatorHost = locator.getHost();
- InetAddress addr = null;
- if(locatorHost != null)
+ /**
+ * Sets timeout (in millseconds) to be used for the socket connection.
+ */
+ public void setTimeout(int timeout)
{
- addr = InetAddress.getByName(locator.getHost());
+ this.timeout = timeout;
}
- else
+
+ /**
+ * The timeout (in milliseconds) used for the socket connection.
+ */
+ public int getTimeout()
{
- addr = InetAddress.getLocalHost();
+ return timeout;
}
- int port = locator.getPort();
- if(port <= 0)
+
+ public boolean isLeaseActivated()
{
- port = assignPort();
+ return leaseManagement;
}
- // set the bind address
- serverBindAddress = (String) config.get(SERVER_BIND_ADDRESS_KEY);
- clientConnectAddress = (String) config.get(CLIENT_CONNECT_ADDRESS_KEY);
- if(serverBindAddress == null)
+ public void addConnectionListener(ConnectionListener listener)
{
- if(clientConnectAddress != null)
+ if(listener != null)
{
- // can't use uri address, as is for client only
- serverBindAddress = InetAddress.getLocalHost().getHostAddress();
+ connectionNotifier.addListener(listener);
+
+ if(leasePeriod > 0)
+ {
+ leaseManagement = true;
+ }
}
else
{
- serverBindAddress = addr.getHostAddress();
+ throw new IllegalArgumentException("Can not add null ConnectionListener.");
}
}
- // set the bind port
- String serverBindPortString = (String) config.get(SERVER_BIND_PORT_KEY);
- String clientConnectPortString = (String) config.get(CLIENT_CONNECT_PORT_KEY);
- if(clientConnectPortString != null)
- {
- try
- {
- clientConnectPort = Integer.parseInt(clientConnectPortString);
- }
- catch(NumberFormatException e)
+ public void removeConnectionListener(ConnectionListener listener)
{
- throw new InvalidConfigurationException("Can not set client bind port because can not convert given value (" + clientConnectPortString + ") to a number.");
- }
- }
- if(serverBindPortString != null)
+ if(connectionNotifier != null)
{
- try
+ connectionNotifier.removeListener(listener);
+
+ // turn off lease management if no listeners (since no one to tell client died)
+ if(connectionNotifier.size() == 0)
{
- serverBindPort = Integer.parseInt(serverBindPortString);
- if(serverBindPort <= 0)
+ leaseManagement = false;
+
+ // go through any existing leases and terminate them
+ Set clientKeys = clientLeases.keySet();
+ Iterator itr = clientKeys.iterator();
+ while(itr.hasNext())
{
- serverBindPort = assignPort();
+ String sessionId = (String)itr.next();
+ Lease clientLease = (Lease)clientLeases.get(sessionId);
+ clientLease.terminateLease(sessionId);
}
-
+ clientLeases.clear();
}
- catch(NumberFormatException e)
- {
- throw new InvalidConfigurationException("Can not set server bind port because can not convert given value (" + serverBindPortString + ") to a number.");
}
}
- else
+
+ /**
+ * Sets the amount of time (in milliseconds) that a client should renew its lease. If this value
+ * is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used. This
+ * value will also be what is given to the client when it initially querys server for leasing
+ * information. If set after create() method called, this value will override value set by
+ * CLIENT_LEASE_PERIOD key.
+ */
+ public void setLeasePeriod(long leasePeriodValue)
{
- if(clientConnectPort > 0)
+ this.leasePeriod = leasePeriodValue;
+
+ if (leasePeriod <= 0)
{
- // can't use uri port, as is for client only
- serverBindPort = PortUtil.findFreePort(locator.getHost());
+ this.leaseManagement = false;
}
else
{
- serverBindPort = port;
- }
- }
-
- // get timeout config
- String timeoutPeriod = (String)config.get(TIMEOUT);
- if(timeoutPeriod != null && timeoutPeriod.length() > 0)
- {
- try
+ if(connectionNotifier != null && connectionNotifier.size() > 0)
{
- timeout = Integer.parseInt(timeoutPeriod);
+ this.leaseManagement = true;
}
- catch(NumberFormatException e)
- {
- throw new InvalidConfigurationException("Can not set timeout because can not convert give value (" + timeoutPeriod + ") to a number.");
}
}
- // config for client lease period
- String clientLeasePeriod = (String)config.get(CLIENT_LEASE_PERIOD);
- if(clientLeasePeriod != null)
- {
- try
+ /**
+ * Gets the amount of time (in milliseconds) that a client should renew its lease.
+ */
+ public long getLeasePeriod()
{
- long leasePeriodValue = Long.parseLong(clientLeasePeriod);
- setLeasePeriod(leasePeriodValue);
+ return leasePeriod;
}
- catch(NumberFormatException e)
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public String getClientConnectAddress()
{
- throw new InvalidConfigurationException("Can not set client lease period because can not convert given value (" + clientLeasePeriod + ") to a number.");
- }
+ return clientConnectAddress;
}
- createServerSocketFactory();
+ public int getClientConnectPort()
+ {
+ return clientConnectPort;
}
- protected int assignPort() throws IOException
+ public void setClientConnectPort(int clientConnectPort)
{
- int port;
- port = PortUtil.findFreePort(locator.getHost());
- // re-write locator since the port is different
- InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), port, locator.getPath(), locator.getParameters());
- // need to update the locator key used in the invoker registry
- InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
- this.locator = newLocator;
- return port;
+ this.clientConnectPort = clientConnectPort;
}
- public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
+ /**
+ * This method should only be called by the service controller when this invoker is specified
+ * within the Connector configuration of a service xml. Calling this directly will have no
+ * effect, as will be used in building the locator uri that is published for detection and this
+ * happens when the invoker is first created and started (after that, no one will be aware of a
+ * change).
+ *
+ * @jmx:managed-attribute
+ */
+ public void setClientConnectAddress(String clientConnectAddress)
{
- this.serverSocketFactory = serverSocketFactory;
+ this.clientConnectAddress = clientConnectAddress;
}
- public ServerSocketFactory getServerSocketFactory()
+ public String getServerBindAddress()
{
- return serverSocketFactory;
+ return serverBindAddress;
}
- protected ServerSocketFactory createServerSocketFactory() throws IOException
- {
- // only want to look at config if server socket factory has not already been set
- if(serverSocketFactory == null)
- {
- Object obj = configuration.get(Remoting.CUSTOM_SERVER_SOCKET_FACTORY);
- if (obj != null)
- {
- if (obj instanceof ServerSocketFactory)
+ public int getServerBindPort()
{
- serverSocketFactory = (ServerSocketFactory) obj;
+ return serverBindPort;
}
- else
+
+ /**
+ * Sets the maximum number of thread to be used in the thread pool for one way invocations
+ * (server side). This property is only used when the default oneway thread pool is used. If set
+ * after create() method called, this value will override value set by MAX_NUM_ONEWAY_THREADS_KEY
+ * key.
+ */
+ public void setMaxNumberOfOnewayThreads(int numOfThreads)
{
- throw new RuntimeException("Can not set custom server socket factory (" + obj + ") as is not of type javax.net.SocketFactory");
- }
+ this.maxNumberThreads = numOfThreads;
}
- if (serverSocketFactory == null)
- {
/**
- * TODO: -TME This is another big hack because of dependancy on JMX within configuration.
- * Have to wait till the mbean server is set before can actually set the server socket
- * factory since it is an mbean (new server's DI will fix all this). Would prefer this
- * to be in the setup() method...
- * Also, I can't cast the mbean proxy directly to ServerSocketFactory because it is not
- * an interface. Therefore, have to require that ServerSocketFactoryMBean is used.
+ * Gets the maximum number of thread to be used in the thread pool for one way invocations
+ * (server side).
*/
- String serverSocketFactoryString = (String)configuration.get(SERVER_SOCKET_FACTORY);
- if(serverSocketFactoryString != null && serverSocketFactoryString.length() > 0)
- {
- try
- {
- if(serverSocketFactoryString != null)
+ public int getMaxNumberOfOnewayThreads()
{
- MBeanServer server = getMBeanServer();
- ObjectName serverSocketFactoryObjName = new ObjectName(serverSocketFactoryString);
- if(server != null)
+ return this.maxNumberThreads;
+ }
+
+ /**
+ * Gets the oneway thread pool to use.
+ */
+ public ThreadPool getOnewayThreadPool()
{
- try
+ if(onewayThreadPool == null)
{
- ServerSocketFactoryMBean serverSocketFactoryMBean = (ServerSocketFactoryMBean) MBeanServerInvocationHandler.newProxyInstance(server,
- serverSocketFactoryObjName,
- ServerSocketFactoryMBean.class,
- false);
- serverSocketFactory = new ServerSocketFactoryWrapper(serverSocketFactoryMBean);
- }
- catch(Exception e)
+ // if no thread pool class set, then use default BasicThreadPool
+ if(onewayThreadPoolClass == null || onewayThreadPoolClass.length() == 0)
{
- log.debug("Error creating mbean proxy for server socket factory for object name: " + serverSocketFactoryObjName + ". Will try by class name.");
- }
+ BasicThreadPool pool = new BasicThreadPool("JBossRemoting Client Oneway");
+ pool.setMaximumPoolSize(maxNumberThreads);
+ pool.setBlockingMode(BlockingMode.WAIT);
+ onewayThreadPool = pool;
}
else
{
- log.debug("The 'serverSocketFactory' attribute was set with a value, but the MBeanServer reference is null.");
- }
- }
- }
- catch(MalformedObjectNameException e)
+ //first check to see if this is an ObjectName
+ boolean isObjName = false;
+ try
{
- log.debug("Attibute value (" + serverSocketFactoryString + ") passed is not a valid ObjectName. Can not look up if is a mbean service. Will try by classname.");
+ ObjectName objName = new ObjectName(onewayThreadPoolClass);
+ onewayThreadPool = createThreadPoolProxy(objName);
+ isObjName = true;
}
- catch(NullPointerException e)
+ catch(MalformedObjectNameException e)
{
- log.debug("Could not set up the server socket factory as a mbean service due to null pointer exception.");
+ log.debug("Thread pool class supplied is not an object name.");
}
- // couldn't create from object name for mbean service, will try by class name
- if(serverSocketFactory == null)
+ if(!isObjName)
{
try
{
- Class cl = ClassLoaderUtility.loadClass(serverSocketFactoryString, getClass());
-
- Constructor serverSocketConstructor = null;
- serverSocketConstructor = cl.getConstructor(new Class[]{});
- serverSocketFactory = (ServerSocketFactory)serverSocketConstructor.newInstance(new Object[] {});
- log.trace("ServerSocketFactory (" + serverSocketFactoryString + ") loaded");
+ onewayThreadPool = (ThreadPool)Class.
+ forName(onewayThreadPoolClass, false, getClassLoader()).newInstance();
}
catch(Exception e)
{
- log.debug("Could not create server socket factory by classname (" + serverSocketFactoryString + "). Error message: " + e.getMessage());
+ throw new RuntimeException("Error loading instance of ThreadPool based " +
+ "on class name " + onewayThreadPoolClass);
}
}
}
}
+ return onewayThreadPool;
}
- if (serverSocketFactory == null && needsCustomSSLConfiguration(configuration))
- {
- try
+ /**
+ * Sets the oneway thread pool to use.
+ */
+ public void setOnewayThreadPool(ThreadPool pool)
{
- SSLSocketBuilder socketBuilder = new SSLSocketBuilder(configuration);
- socketBuilder.setUseSSLServerSocketFactory( false );
- serverSocketFactory = socketBuilder.createSSLServerSocketFactory();
+ this.onewayThreadPool = pool;
}
- catch (IOException e)
+
+ public MBeanServer getMBeanServer()
{
- throw new RuntimeException("Unable to create customized SSL socket factory", e);
- }
+ return mbeanServer;
}
- if(serverSocketFactory == null)
+ public void setMBeanServer(MBeanServer server)
{
- log.debug(this + " did not find server socket factory configuration as mbean service " +
- "or classname. Creating default server socket factory.");
-
- serverSocketFactory = getDefaultServerSocketFactory();
+ // This has been added in order to support mbean service configuration. Now supporting
+ // classes, such as the ServerInvokerCallbackHandler can find and use resources such as
+ // CallbackStore, which can be run as a service mbean (and specified via object name within
+ // config). The use of JMX throughout remoting is a problem as now have to tie it in all
+ // throughout the code for service configuration as is being done here. When migrate to use
+ // under new server model, which does not depend on JMX, can rip out code such as this.
+ this.mbeanServer = server;
}
- log.debug(this + " created server socket factory " + serverSocketFactory);
-
- serverSocketFactory = wrapServerSocketFactory(serverSocketFactory, configuration);
- return serverSocketFactory;
-
+ /**
+ * @return true if a server invocation handler has been registered for this subsystem.
+ */
+ public synchronized boolean hasInvocationHandler(String subsystem)
+ {
+ return handlers.containsKey(subsystem);
}
- protected boolean justNeedsSSLClientMode(Map configuration)
- {
- if (configuration.size() == 1 && configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE))
+ /**
+ * @return an array of keys for each subsystem this invoker can handle.
+ */
+ public synchronized String[] getSupportedSubsystems()
{
- String useClientModeString = (String) configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
- return Boolean.valueOf(useClientModeString).booleanValue();
+ String subsystems [] = new String[handlers.size()];
+ return (String[]) handlers.keySet().toArray(subsystems);
}
- if (configuration.size() == 1 && configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
+ /**
+ * @return an array of the server invocation handlers.
+ */
+ public synchronized ServerInvocationHandler[] getInvocationHandlers()
{
- String useClientModeString = (String) configuration.get(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE);
- return Boolean.valueOf(useClientModeString).booleanValue();
+ ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
+ return (ServerInvocationHandler[]) handlers.values().toArray(ih);
}
- if (configuration.size() == 2
- && configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE)
- && configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
+ /**
+ * Add a server invocation handler for a particular subsystem. Typically, subsystems are defined
+ * in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
+ *
+ * @return previous ServerInvocationHandler with the same sybsystem value (case insensitive) or
+ * null if a previous one did not exist.
+ */
+ public synchronized ServerInvocationHandler addInvocationHandler(String subsystem,
+ ServerInvocationHandler handler)
{
- String useClientModeString = (String) configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
- return Boolean.valueOf(useClientModeString).booleanValue();
- }
+ handler.setInvoker(this);
- return false;
+ ServerInvocationHandler oldHandler =
+ (ServerInvocationHandler)handlers.put(subsystem.toUpperCase(), handler);
+
+ log.debug(this + " added " + handler + " for subsystem '" + subsystem + "'" +
+ (oldHandler == null ? "" : ", replacing old handler " + oldHandler));
+
+ return oldHandler;
}
/**
- * Gets the default server socket factory to use for the server invoker. The intention
- * is this method will be overridden by sub-classes for their specific defaults.
- * @return
+ * Remove a subsystem invocation handler.
*/
- protected ServerSocketFactory getDefaultServerSocketFactory() throws IOException
+ public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
{
- return ServerSocketFactory.getDefault();
- }
+ ServerInvocationHandler handler =
+ (ServerInvocationHandler)handlers.remove(subsystem.toUpperCase());
+
+ log.debug(this + (handler == null ?
+ " tried to remove handler for " + subsystem + " but no handler found" :
+ " removed handler " + handler + " for subsystem '" + subsystem + "'"));
+ return handler;
+ }
- protected ServerSocketFactory wrapServerSocketFactory(ServerSocketFactory ssf, Map config)
+ /**
+ * Get a ServerInvocationHandler for a given subsystem type.
+ */
+ public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
{
- if (config == null)
- return ssf;
+ return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
+ }
- Object o = config.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
+ public Object invoke(Object invoke) throws IOException
+ {
+ InvocationRequest request = null;
+ InvocationResponse response = null;
- if (o == null)
- return ssf;
+ if(trace) { log.trace("server received invocation " + invoke); }
- if (o instanceof SocketCreationListener)
- {
- return new CreationListenerServerSocketFactory(ssf, (SocketCreationListener) o);
- }
- else if (o instanceof String)
+ if(invoke != null && invoke instanceof InvocationRequest)
{
+ request = (InvocationRequest) invoke;
try
{
- Class c = ClassLoaderUtility.loadClass((String) o, ServerInvoker.class);
- SocketCreationListener listener = (SocketCreationListener) c.newInstance();
- return new CreationListenerServerSocketFactory(ssf, listener);
+
+ Object result = invoke(request);
+
+ response = new InvocationResponse(request.getSessionId(),
+ result, false, request.getReturnPayload());
+
}
- catch (Exception e)
+ catch(Throwable throwable)
{
- log.error("unable to instantiate class: " + o, e);
- return ssf;
+ response = new InvocationResponse(request.getSessionId(),
+ throwable, true, request.getReturnPayload());
}
}
else
{
- log.error("unrecognized type for socket creation server listener: " + o);
- return ssf;
+ log.error("server invoker received " + invoke + " as invocation. " +
+ "Must not be null and must be of type InvocationRequest.");
+
+ Exception e = new Exception("Error processing invocation request on " + getLocator() +
+ ". Either invocation was null or of wrong type.");
+
+ response =
+ new InvocationResponse(request.getSessionId(), e, true, request.getReturnPayload());
}
+ return response;
}
-
/**
- * Sets timeout (in millseconds) to be used for the socket connection.
- * @param timeout
+ * Processes invocation request depending on the invocation type (internal, name based, oneway,
+ * etc). Can be called on directly when client and server are local to one another (by-passing
+ * serialization).
*/
- public void setTimeout(int timeout)
+ public Object invoke(InvocationRequest invocation) throws Throwable
{
- this.timeout = timeout;
- }
+ if (isStarted())
+ {
+ Object param = invocation.getParameter();
+ Object result = null;
- /**
- * The timeout (in milliseconds) used for the socket connection.
- */
- public int getTimeout()
+ // check to see if this is a is alive ping
+ if ("$PING$".equals(param))
{
- return timeout;
+ //if checking lease, need to update lease flag
+ if (leaseManagement)
+ {
+ //NOTE we only update the lease when we receive a PING, not for
+ //all invocations
+ updateClientLease(invocation);
}
- public boolean isLeaseActivated()
- {
- return leaseManagement;
+ // if this is an invocation ping, just pong back
+ Map responseMap = new HashMap();
+ responseMap.put(CLIENT_LEASE_PERIOD, new Long(leasePeriod));
+ return new InvocationResponse(invocation.getSessionId(), new Boolean(leaseManagement),
+ false, responseMap);
}
- public void addConnectionListener(ConnectionListener listener)
+ if ("$DISCONNECT$".equals(param))
{
- if(listener != null)
+ if (leaseManagement)
{
- connectionNotifier.addListener(listener);
+ terminateLease(invocation);
+ }
+ return null;
+ }
- if(leasePeriod > 0)
+ //TODO: -TME both oneway and internal invocation will be broken since have not
+ // deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
+ if (param instanceof OnewayInvocation)
{
- leaseManagement = true;
+ // no point in delaying return to client if oneway
+ handleOnewayInvocation((OnewayInvocation)param, invocation);
}
+ else
+ {
+ String subsystem = invocation.getSubsystem();
+ String clientId = invocation.getSessionId();
+
+ // too bad we can't optimize this a little better, since we take a lookup hit for
+ // each invocation -JGH
+ ServerInvocationHandler handler = null;
+ if (subsystem != null)
+ {
+ handler = (ServerInvocationHandler)handlers.get(subsystem.toUpperCase());
}
else
{
- throw new IllegalArgumentException("Can not add null ConnectionListener.");
+ // subsystem not specified, so will hope for a default one being set
+ if (!handlers.isEmpty())
+ {
+ handler = (ServerInvocationHandler)handlers.values().iterator().next();
}
}
- public void removeConnectionListener(ConnectionListener listener)
+ if (param instanceof InternalInvocation)
{
- if(connectionNotifier != null)
+ result = handleInternalInvocation((InternalInvocation)param, invocation, handler);
+ }
+ else
{
- connectionNotifier.removeListener(listener);
+ if (trace) { log.trace(this + " dispatching " + invocation + " from client " + clientId + " to subsystem '" + subsystem + "'"); }
- // turn off lease management if no listeners (since no one to tell client died)
- if(connectionNotifier.size() == 0)
+ if (handler == null)
{
- leaseManagement = false;
+ throw new InvalidConfigurationException(
+ "Can not handle invocation request for subsystem(" + subsystem + ") because " +
+ "there are no matching ServerInvocationHandlers registered. Please add via " +
+ "xml configuration or via the Connector's addInvocationHandler() method.");
+ }
+ result = handler.invoke(invocation);
+ }
- // go through any existing leases and terminate them
- Set clientKeys = clientLeases.keySet();
- Iterator itr = clientKeys.iterator();
- while(itr.hasNext())
- {
- String sessionId = (String)itr.next();
- Lease clientLease = (Lease)clientLeases.get(sessionId);
- clientLease.terminateLease(sessionId);
+ if (trace) { log.trace(this + " successfully dispatched invocation, returning " + result + " from subsystem '" + subsystem + "' to client " + clientId); }
}
- clientLeases.clear();
+
+ return result;
}
+ else
+ {
+ log.warn(this + " can not process invocation requests since is not in started state!");
+ throw new InvalidStateException(
+ "Can not process invocation request since is not in started state.");
}
}
/**
- * Sets the amount of time (in milliseconds) that a client should renew its lease. If this value
- * is not set, the default of five seconds (see DEFAULT_CLIENT_LEASE_PERIOD), will be used.
- * This value will also be what is given to the client when it initially querys server for
- * leasing information. If set after create() method called, this value will override value set
- * by CLIENT_LEASE_PERIOD key.
+ * Will get the data type for the marshaller factory so know which marshaller to get to marshal
+ * the data. Will first check the locator uri for a 'datatype' parameter and take that value if
+ * it exists. Otherwise, will use the default datatype for the client invoker, based on
+ * transport.
*/
- public void setLeasePeriod(long leasePeriodValue)
+ public String getDataType()
{
- this.leasePeriod = leasePeriodValue;
-
- if (leasePeriod <= 0)
+ if(dataType == null)
{
- this.leaseManagement = false;
+ dataType = getDataType(getLocator());
+ if(dataType == null)
+ {
+ dataType = getDefaultDataType();
}
- else
+ }
+ return dataType;
+ }
+
+ public void create()
{
- if(connectionNotifier != null && connectionNotifier.size() > 0)
+ if(!created)
{
- this.leaseManagement = true;
+ try
+ {
+ setup();
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Error setting up server invoker " + this, e);
}
+ created = true;
}
}
/**
- * Gets the amount of time (in milliseconds) that a client should renew its lease.
+ * Subclasses should override to provide any specific start logic.
*/
- public long getLeasePeriod()
+ public void start() throws IOException
{
- return leasePeriod;
+ started = true;
+ log.debug(this + " started for locator " + getLocator());
}
/**
- * @jmx:managed-attribute
+ * @return true if the server invoker is started, false if not.
*/
- public String getClientConnectAddress()
+ public boolean isStarted()
{
- return clientConnectAddress;
- }
-
- public int getClientConnectPort()
- {
- return clientConnectPort;
- }
-
- public void setClientConnectPort(int clientConnectPort)
- {
- this.clientConnectPort = clientConnectPort;
+ return started;
}
/**
- * This method should only be called by the service controller when this invoker is
- * specified within the Connector configuration of a service xml. Calling this directly
- * will have no effect, as will be used in building the locator uri that is published
- * for detection and this happens when the invoker is first created and started (after that, no one
- * will be aware of a change).
- *
- * @jmx:managed-attribute
+ * Subclasses should override to provide any specific stop logic.
*/
- public void setClientConnectAddress(String clientConnectAddress)
+ public void stop()
{
- this.clientConnectAddress = clientConnectAddress;
- }
-
+ started = false;
- public String getServerBindAddress()
+ for(Iterator i = callbackHandlers.values().iterator(); i.hasNext(); )
{
- return serverBindAddress;
+ ServerInvokerCallbackHandler callbackHandler = (ServerInvokerCallbackHandler)i.next();
+ callbackHandler.destroy();
}
- public int getServerBindPort()
- {
- return serverBindPort;
+ log.debug(this + " stopped");
}
/**
- * Sets the maximum number of thread to be used in the thread pool for one way invocations (server side).
- * This property is only used when the default oneway thread pool is used.
- * If set after create() method called, this value will override value set by MAX_NUM_ONEWAY_THREADS_KEY key.
- * @param numOfThreads
+ * Destory the invoker permanently.
*/
- public void setMaxNumberOfOnewayThreads(int numOfThreads)
+ public void destroy()
{
- this.maxNumberThreads = numOfThreads;
+ if(classbyteloader != null)
+ {
+ classbyteloader.destroy();
+ }
}
/**
- * Gets the maximum number of thread to be used in the thread pool for one way invocations (server side).
- * @return
+ * Sets the server invoker's transport specific configuration. Will need to set before calling
+ * start() method (or at least stop() and start() again) before configurations will take affect.
*/
- public int getMaxNumberOfOnewayThreads()
+ public void setConfiguration(Map configuration)
{
- return this.maxNumberThreads;
+ this.configuration = configuration;
}
/**
- * Gets the oneway thread pool to use.
- * @return
+ * Gets the server invoker's transport specific configuration.
*/
- public ThreadPool getOnewayThreadPool()
+ public Map getConfiguration()
{
- if(onewayThreadPool == null)
+ return configuration;
+ }
+
+ public void removeCallbackListener(String subsystem, InvokerCallbackHandler callbackHandler)
{
- // if no thread pool class set, then use default BasicThreadPool
- if(onewayThreadPoolClass == null || onewayThreadPoolClass.length() == 0)
+ ServerInvocationHandler handler = null;
+ if(subsystem != null)
{
- BasicThreadPool pool = new BasicThreadPool("JBossRemoting Client Oneway");
- pool.setMaximumPoolSize(maxNumberThreads);
- pool.setBlockingMode(BlockingMode.WAIT);
- onewayThreadPool = pool;
+ handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
}
else
{
- //first check to see if this is an ObjectName
- boolean isObjName = false;
- try
+ // subsystem not specified, so will hope for a default one being set
+ if(!handlers.isEmpty())
{
- ObjectName objName = new ObjectName(onewayThreadPoolClass);
- onewayThreadPool = createThreadPoolProxy(objName);
- isObjName = true;
+ handler = (ServerInvocationHandler) handlers.values().iterator().next();
}
- catch(MalformedObjectNameException e)
- {
- log.debug("Thread pool class supplied is not an object name.");
+ }
+ handler.removeListener(callbackHandler);
}
- if(!isObjName)
+ /**
+ * @return the String for the object name to be used for the invoker.
+ */
+ public String getMBeanObjectName()
{
- try
+ InvokerLocator locator = getLocator();
+ StringBuffer buffer =
+ new StringBuffer("jboss.remoting:service=invoker,transport= " + locator.getProtocol());
+ buffer.append(",host=").append(locator.getHost());
+ buffer.append(",port=").append(locator.getPort());
+ Map param = locator.getParameters();
+ if(param != null)
{
- onewayThreadPool = (ThreadPool) Class.forName(onewayThreadPoolClass, false, getClassLoader()).newInstance();
- }
- catch(Exception e)
+ Iterator itr = param.keySet().iterator();
+ while(itr.hasNext())
{
- throw new RuntimeException("Error loading instance of ThreadPool based on class name: " + onewayThreadPoolClass);
+ buffer.append(",");
+ String key = (String) itr.next();
+ String value = (String) param.get(key);
+ buffer.append(key);
+ buffer.append("=");
+ buffer.append(value);
}
}
-
+ return buffer.toString();
}
- }
- return onewayThreadPool;
- }
+ // Package protected ----------------------------------------------------------------------------
- /**
- * Sets the oneway thread pool to use.
- * @param pool
- */
- public void setOnewayThreadPool(ThreadPool pool)
+ // Protected ------------------------------------------------------------------------------------
+
+ protected abstract String getDefaultDataType();
+
+ protected void setup() throws Exception
{
- this.onewayThreadPool = pool;
- }
+ Map config = getConfiguration();
+ String maxNumOfThreads = (String)config.get(MAX_NUM_ONEWAY_THREADS_KEY);
- private ThreadPool createThreadPoolProxy(ObjectName objName)
+ if(maxNumOfThreads != null && maxNumOfThreads.length() > 0)
{
- ThreadPool pool;
- MBeanServer server = getMBeanServer();
- if(server != null)
+ try
{
- ThreadPoolMBean poolMBean = (ThreadPoolMBean)
- MBeanServerInvocationHandler.newProxyInstance(server,
- objName,
- ThreadPoolMBean.class,
- false);
- pool = poolMBean.getInstance();
+ maxNumberThreads = Integer.parseInt(maxNumOfThreads);
}
- else
+ catch(NumberFormatException e)
{
- throw new RuntimeException("Can not register MBean ThreadPool as the ServerInvoker has not been registered with a MBeanServer.");
+ log.error("Can not convert max number of threads value (" +
+ maxNumOfThreads + ") into a number.");
}
- return pool;
}
+ onewayThreadPoolClass = (String)config.get(ONEWAY_THREAD_POOL_CLASS_KEY);
- public MBeanServer getMBeanServer()
+ String locatorHost = locator.getHost();
+ InetAddress addr = null;
+ if(locatorHost != null)
{
- return mbeanServer;
+ addr = InetAddress.getByName(locator.getHost());
}
-
- public void setMBeanServer(MBeanServer server)
+ else
{
- /**
- * This has been added in order to support mbean service configuration.
- * Now supporting classes, such as the ServerInvokerCallbackHandler can find and use
- * resources such as CallbackStore, which can be run as a service mbean (and specified
- * via object name within config). The use of JMX throughout remoting is a problem as
- * now have to tie it in all throughout the code for service configuration as is being
- * done here. When migrate to use under new server model, which does not depend on JMX,
- * can rip out code such as this.
- */
- this.mbeanServer = server;
+ addr = InetAddress.getLocalHost();
}
-
- /**
- * return true if a server invocation handler has been registered for this subsystem
- *
- * @param subsystem
- * @return
- */
- public synchronized boolean hasInvocationHandler(String subsystem)
+ int port = locator.getPort();
+ if(port <= 0)
{
- return handlers.containsKey(subsystem);
+ port = assignPort();
}
- /**
- * Return array of keys for each subsystem this invoker can handle.
- */
- public synchronized String[] getSupportedSubsystems()
+ // set the bind address
+ serverBindAddress = (String)config.get(SERVER_BIND_ADDRESS_KEY);
+ clientConnectAddress = (String)config.get(CLIENT_CONNECT_ADDRESS_KEY);
+ if(serverBindAddress == null)
{
- String subsystems [] = new String[handlers.size()];
- return (String[]) handlers.keySet().toArray(subsystems);
- }
-
- /**
- * Return an array of the server invocation handlers.
- */
- public synchronized ServerInvocationHandler[] getInvocationHandlers()
+ if(clientConnectAddress != null)
{
- ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
- return (ServerInvocationHandler[]) handlers.values().toArray(ih);
+ // can't use uri address, as is for client only
+ serverBindAddress = InetAddress.getLocalHost().getHostAddress();
}
-
- /**
- * Add a server invocation handler for a particular subsystem. Typically, subsystems are defined
- * in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
- *
- * @return previous ServerInvocationHandler with the same sybsystem value (case insensitive) or
- * null if a previous one did not exist.
- */
- public synchronized ServerInvocationHandler addInvocationHandler(String subsystem,
- ServerInvocationHandler handler)
+ else
{
- handler.setInvoker(this);
- ServerInvocationHandler oldHandler =
- (ServerInvocationHandler)handlers.put(subsystem.toUpperCase(), handler);
-
- log.debug(this + " added " + handler + " for subsystem '" + subsystem + "'" +
- (oldHandler == null ? "" : ", replacing old handler " + oldHandler));
-
- return oldHandler;
+ serverBindAddress = addr.getHostAddress();
}
-
- /**
- * Remove a subsystem invocation handler.
- */
- public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
- {
- ServerInvocationHandler handler =
- (ServerInvocationHandler)handlers.remove(subsystem.toUpperCase());
-
- log.debug(this + (handler == null ?
- " tried to remove handler for " + subsystem + " but no handler found" :
- " removed handler " + handler + " for subsystem '" + subsystem + "'"));
-
- return handler;
}
- /**
- * Get a ServerInvocationHandler for a given subsystem type.
- */
- public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
+ // set the bind port
+ String serverBindPortString = (String)config.get(SERVER_BIND_PORT_KEY);
+ String clientConnectPortString = (String)config.get(CLIENT_CONNECT_PORT_KEY);
+ if(clientConnectPortString != null)
{
- return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
+ try
+ {
+ clientConnectPort = Integer.parseInt(clientConnectPortString);
}
-
- public Object invoke(Object invoke) throws IOException
+ catch(NumberFormatException e)
{
- InvocationRequest request = null;
- InvocationResponse response = null;
-
- if(trace) { log.trace("server received invocation " + invoke); }
-
- if(invoke != null && invoke instanceof InvocationRequest)
+ throw new InvalidConfigurationException("Can not set client bind port because can " +
+ "not convert given value (" + clientConnectPortString + ") to a number.");
+ }
+ }
+ if(serverBindPortString != null)
{
- request = (InvocationRequest) invoke;
try
{
-
- Object result = invoke(request);
-
- response = new InvocationResponse(request.getSessionId(),
- result, false, request.getReturnPayload());
+ serverBindPort = Integer.parseInt(serverBindPortString);
+ if(serverBindPort <= 0)
+ {
+ serverBindPort = assignPort();
+ }
}
- catch(Throwable throwable)
+ catch(NumberFormatException e)
{
- response = new InvocationResponse(request.getSessionId(),
- throwable, true, request.getReturnPayload());
+ throw new InvalidConfigurationException("Can not set server bind port because can " +
+ "not convert given value (" + serverBindPortString + ") to a number.");
}
}
else
{
- log.error("server invoker received " + invoke + " as invocation. Must not be null and must be of type InvocationRequest.");
- response = new InvocationResponse(request.getSessionId(),
- new Exception("Error processing invocation request on " + getLocator() + ". Either invocation was null or of wrong type."),
- true, request.getReturnPayload());
+ if(clientConnectPort > 0)
+ {
+ // can't use uri port, as is for client only
+ serverBindPort = PortUtil.findFreePort(locator.getHost());
+ }
+ else
+ {
+ serverBindPort = port;
}
- return response;
-
}
- /**
- * Will get the data type for the marshaller factory so know which marshaller to
- * get to marshal the data. Will first check the locator uri for a 'datatype'
- * parameter and take that value if it exists. Otherwise, will use the
- * default datatype for the client invoker, based on transport.
- *
- * @return
- */
- public String getDataType()
- {
- if(dataType == null)
+ // get timeout config
+ String timeoutPeriod = (String)config.get(TIMEOUT);
+ if(timeoutPeriod != null && timeoutPeriod.length() > 0)
{
- dataType = getDataType(getLocator());
- if(dataType == null)
+ try
{
- dataType = getDefaultDataType();
+ timeout = Integer.parseInt(timeoutPeriod);
}
+ catch(NumberFormatException e)
+ {
+ throw new InvalidConfigurationException("Can not set timeout because can not " +
+ "convert give value (" + timeoutPeriod + ") to a number.");
}
- return dataType;
}
- //TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
- private String getDataType(InvokerLocator locator)
- {
- String type = null;
-
- if(locator != null)
+ // config for client lease period
+ String clientLeasePeriod = (String)config.get(CLIENT_LEASE_PERIOD);
+ if(clientLeasePeriod != null)
{
- Map params = locator.getParameters();
- if(params != null)
+ try
{
- type = (String) params.get(InvokerLocator.DATATYPE);
+ long leasePeriodValue = Long.parseLong(clientLeasePeriod);
+ setLeasePeriod(leasePeriodValue);
}
+ catch(NumberFormatException e)
+ {
+ throw new InvalidConfigurationException("Can not set client lease period because " +
+ "can not convert given value (" + clientLeasePeriod + ") to a number.");
}
- return type;
}
- protected abstract String getDefaultDataType();
+ createServerSocketFactory();
+ }
- /**
- * Processes invocation request depending on the invocation type (internal, name based, oneway, etc).
- * Can be called on directly when client and server are local to one another (by-passing serialization)
- *
- * @param invocation
- * @return
- * @throws Throwable
- */
- public Object invoke(InvocationRequest invocation) throws Throwable
- {
- if (isStarted())
+ protected int assignPort() throws IOException
{
- Object param = invocation.getParameter();
- Object result = null;
+ int port;
+ port = PortUtil.findFreePort(locator.getHost());
- // check to see if this is a is alive ping
- if ("$PING$".equals(param))
- {
- //if checking lease, need to update lease flag
- if (leaseManagement)
- {
- //NOTE we only update the lease when we receive a PING, not for
- //all invocations
- updateClientLease(invocation);
- }
+ // re-write locator since the port is different
+ InvokerLocator newLocator = new InvokerLocator(locator.getProtocol(), locator.getHost(), port,
+ locator.getPath(), locator.getParameters());
- // if this is an invocation ping, just pong back
- Map responseMap = new HashMap();
- responseMap.put(CLIENT_LEASE_PERIOD, new Long(leasePeriod));
- return new InvocationResponse(invocation.getSessionId(), new Boolean(leaseManagement), false, responseMap);
+ // need to update the locator key used in the invoker registry
+ InvokerRegistry.updateServerInvokerLocator(locator, newLocator);
+ this.locator = newLocator;
+ return port;
}
- if ("$DISCONNECT$".equals(param))
- {
- if (leaseManagement)
+ protected ServerSocketFactory createServerSocketFactory() throws IOException
{
- terminateLease(invocation);
- }
- return null;
- }
-
- //TODO: -TME both oneway and internal invocation will be broken since have not
- // deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
- if (param instanceof OnewayInvocation)
+ // only want to look at config if server socket factory has not already been set
+ if(serverSocketFactory == null)
{
- handleOnewayInvocation((OnewayInvocation) param, invocation);
- }
- else // no point in delaying return to client if oneway
+ Object obj = configuration.get(Remoting.CUSTOM_SERVER_SOCKET_FACTORY);
+ if (obj != null)
{
- String subsystem = invocation.getSubsystem();
- String clientId = invocation.getSessionId();
-
- // too bad we can't optimize this a little better, since we take a lookup hit for
- // each invocation -JGH
- ServerInvocationHandler handler = null;
- if (subsystem != null)
+ if (obj instanceof ServerSocketFactory)
{
- handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
+ serverSocketFactory = (ServerSocketFactory)obj;
}
else
{
- // subsystem not specified, so will hope for a default one being set
- if (!handlers.isEmpty())
- {
- handler = (ServerInvocationHandler) handlers.values().iterator().next();
+ throw new RuntimeException("Can not set custom server socket factory (" + obj +
+ ") as is not of type javax.net.SocketFactory");
}
}
- if (param instanceof InternalInvocation)
+
+ if (serverSocketFactory == null)
{
- result = handleInternalInvocation((InternalInvocation) param, invocation, handler);
+ // TODO: -TME This is another big hack because of dependancy on JMX within configuration
+ // Have to wait till the mbean server is set before can actually set the server socket
+ // factory since it is an mbean (new server's DI will fix all this). Would prefer this
+ // to be in the setup() method...
+ // Also, I can't cast the mbean proxy directly to ServerSocketFactory because it is not
+ // an interface. Therefore, have to require that ServerSocketFactoryMBean is used.
- }
- else
+ String serverSocketFactoryString = (String)configuration.get(SERVER_SOCKET_FACTORY);
+ if(serverSocketFactoryString != null && serverSocketFactoryString.length() > 0)
{
- if (trace) { log.trace(this + " dispatching " + invocation + " from client " + clientId + " to subsystem '" + subsystem + "'"); }
+ try
+ {
+ if(serverSocketFactoryString != null)
+ {
+ MBeanServer server = getMBeanServer();
+ ObjectName serverSocketFactoryObjName =
+ new ObjectName(serverSocketFactoryString);
- if (handler == null)
+ if(server != null)
{
- throw new InvalidConfigurationException("Can not handle invocation request for subsystem("+subsystem+") because there are no " +
- "matching ServerInvocationHandlers registered. Please add via xml configuration " +
- "or via the Connector's addInvocationHandler() method.");
- }
- result = handler.invoke(invocation);
+ try
+ {
+ ServerSocketFactoryMBean serverSocketFactoryMBean =
+ (ServerSocketFactoryMBean)MBeanServerInvocationHandler.
+ newProxyInstance(server, serverSocketFactoryObjName,
+ ServerSocketFactoryMBean.class, false);
+ serverSocketFactory =
+ new ServerSocketFactoryWrapper(serverSocketFactoryMBean);
}
-
- if (trace) { log.trace(this + " successfully dispatched invocation, returning " + result + " from subsystem '" + subsystem + "' to client " + clientId); }
+ catch(Exception e)
+ {
+ log.debug("Error creating mbean proxy for server socket factory " +
+ "for object name " + serverSocketFactoryObjName + ". " +
+ "Will try by class name.");
}
-
- return result;
}
else
{
- log.warn(this + " can not process invocation requests since is not in started state!");
- throw new InvalidStateException("Can not process invocation request since is not in started state.");
+ log.debug("The 'serverSocketFactory' attribute was set with a value, " +
+ "but the MBeanServer reference is null.");
}
}
-
- private void terminateLease(InvocationRequest invocation)
- {
- if (invocation != null)
+ }
+ catch(MalformedObjectNameException e)
{
- String clientSessionId = invocation.getSessionId();
- Lease clientLease = (Lease) clientLeases.get(clientSessionId);
- if (clientLease != null)
+ log.debug("Attibute value (" + serverSocketFactoryString + ") passed is not a " +
+ "valid ObjectName. Can not look up if is a mbean service. Will try by classname.");
+ }
+ catch(NullPointerException e)
{
- boolean clientOnlyTerminated = false;
- // now have to determine if is just Client that disconnected
- // or if all Clients disconnected, thus the client invoker
- // is also disconnected as well.
- Map reqMap = invocation.getRequestPayload();
- if (reqMap != null)
+ log.debug("Could not set up the server socket factory as a mbean service " +
+ "due to null pointer exception.");
+ }
+
+ // couldn't create from object name for mbean service, will try by class name
+ if(serverSocketFactory == null)
{
- Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
- if (holderObj != null && holderObj instanceof ClientHolder)
+ try
{
- // just a client that disconnected, so only need to terminate lease for
- // that particular client (by client session id).
- ClientHolder holder = (ClientHolder) holderObj;
- clientLease.terminateLease(holder.getSessionId());
- clientOnlyTerminated = true;
- }
- }
+ Class cl = ClassLoaderUtility.loadClass(serverSocketFactoryString, getClass());
- // now see if client invoker needs to be terminated
- if (!clientOnlyTerminated)
+ Constructor serverSocketConstructor = null;
+ serverSocketConstructor = cl.getConstructor(new Class[]{});
+ serverSocketFactory =
+ (ServerSocketFactory)serverSocketConstructor.newInstance(new Object[] {});
+ log.trace("ServerSocketFactory (" + serverSocketFactoryString + ") loaded");
+ }
+ catch(Exception e)
{
- clientLease.terminateLease(clientSessionId);
- clientLeases.remove(clientSessionId);
+ log.debug("Could not create server socket factory by classname (" +
+ serverSocketFactoryString + "). Error message: " + e.getMessage());
}
}
- else
- {
- log.warn("Asked to terminate lease for client session id " + clientSessionId + ", but lease for this id could not be found.");
}
}
}
- private void updateClientLease(InvocationRequest invocation)
- {
- if(invocation != null)
+ if (serverSocketFactory == null && needsCustomSSLConfiguration(configuration))
{
- String clientSessionId = invocation.getSessionId();
- if(clientSessionId != null)
+ try
{
- if(trace)
+ SSLSocketBuilder socketBuilder = new SSLSocketBuilder(configuration);
+ socketBuilder.setUseSSLServerSocketFactory(false);
+ serverSocketFactory = socketBuilder.createSSLServerSocketFactory();
+ }
+ catch (IOException e)
{
- log.trace("Getting lease for client session id: " + clientSessionId);
+ throw new RuntimeException("Unable to create customized SSL socket factory", e);
+ }
}
- Lease clientLease = (Lease)clientLeases.get(clientSessionId);
- if(clientLease == null)
+ if(serverSocketFactory == null)
{
- Lease newClientLease = new Lease(clientSessionId, leasePeriod,
- locator.getLocatorURI(), invocation.getRequestPayload(),
- connectionNotifier,
- clientLeases);
- clientLeases.put(clientSessionId, newClientLease);
- newClientLease.startLease();
+ log.debug(this + " did not find server socket factory configuration as mbean service " +
+ "or classname. Creating default server socket factory.");
- if(trace)
- {
- log.trace("No lease established for client session id (" + clientSessionId + "), so starting a new one.");
+ serverSocketFactory = getDefaultServerSocketFactory();
}
+
+ log.debug(this + " created server socket factory " + serverSocketFactory);
+
+ serverSocketFactory = wrapServerSocketFactory(serverSocketFactory, configuration);
+ return serverSocketFactory;
+
}
- else
- {
- // including request payload from invocation as may contain updated list of clients.
- clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
- if(trace)
+ protected boolean justNeedsSSLClientMode(Map configuration)
{
- log.trace("Updated lease for client session id (" + clientSessionId + ")");
- }
+ if (configuration.size() == 1 &&
+ configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE))
+ {
+ String useClientModeString =
+ (String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
+ return Boolean.valueOf(useClientModeString).booleanValue();
}
+
+ if (configuration.size() == 1 &&
+ configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
+ {
+ String useClientModeString =
+ (String)configuration.get(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE);
+ return Boolean.valueOf(useClientModeString).booleanValue();
}
+
+ if (configuration.size() == 2
+ && configuration.containsKey(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE)
+ && configuration.containsKey(SSLSocketBuilder.REMOTING_SOCKET_USE_CLIENT_MODE))
+ {
+ String useClientModeString =
+ (String)configuration.get(SSLSocketBuilder.REMOTING_SERVER_SOCKET_USE_CLIENT_MODE);
+ return Boolean.valueOf(useClientModeString).booleanValue();
}
+
+ return false;
}
/**
- * Takes the real invocation from the client out of the OnewayInvocation and then executes the invoke()
- * with the real invocation on a seperate thread.
- *
- * @param onewayInvocation
- * @param invocation
- * @throws Throwable
+ * Gets the default server socket factory to use for the server invoker. The intention is this
+ * method will be overridden by sub-classes for their specific defaults.
*/
- private void handleOnewayInvocation(OnewayInvocation onewayInvocation, InvocationRequest invocation) throws Throwable
+ protected ServerSocketFactory getDefaultServerSocketFactory() throws IOException
{
- Object[] objs = onewayInvocation.getParameters();
- // The oneway invocation should contain the real param as it's only param in parameter array
- Object realParam = objs[0];
- invocation.setParameter(realParam);
- final InvocationRequest newInvocation = invocation;
+ return ServerSocketFactory.getDefault();
+ }
- ThreadPool executor = getOnewayThreadPool();
- Runnable onewayRun = new Runnable()
+ protected ServerSocketFactory wrapServerSocketFactory(ServerSocketFactory ssf, Map config)
{
- public void run()
+ if (config == null)
+ {
+ return ssf;
+ }
+
+ Object o = config.get(Remoting.SOCKET_CREATION_SERVER_LISTENER);
+
+ if (o == null)
+ {
+ return ssf;
+ }
+
+ if (o instanceof SocketCreationListener)
+ {
+ return new CreationListenerServerSocketFactory(ssf, (SocketCreationListener) o);
+ }
+ else if (o instanceof String)
{
try
{
- invoke(newInvocation);
+ Class c = ClassLoaderUtility.loadClass((String) o, ServerInvoker.class);
+ SocketCreationListener listener = (SocketCreationListener)c.newInstance();
+ return new CreationListenerServerSocketFactory(ssf, listener);
}
- catch(Throwable e)
+ catch (Exception e)
{
- // throw away exception since can't get it back to original caller
- log.error("Error executing server oneway invocation request: " + newInvocation, e);
+ log.error("unable to instantiate class: " + o, e);
+ return ssf;
}
}
- };
- executor.run(onewayRun);
+ else
+ {
+ log.error("unrecognized type for socket creation server listener: " + o);
+ return ssf;
+ }
}
/**
- * Handles both internal and external invocations (internal meaning only
- * to be used within remoting and external for ones that go to handlers.
- *
- * @param param
- * @param invocation
- * @param handler
- * @return
- * @throws Throwable
+ * Handles both internal and external invocations (internal meaning only to be used within
+ * remoting and external for ones that go to handlers.
*/
protected Object handleInternalInvocation(InternalInvocation param,
InvocationRequest invocation,
- ServerInvocationHandler handler)
- throws Throwable
+ ServerInvocationHandler handler) throws Throwable
{
Object result = null;
String methodName = param.getMethodName();
- if(trace)
- {
- log.trace("handling InternalInvocation where method name = " + methodName);
- }
+
+ if(trace) { log.trace("handling InternalInvocation where method name = " + methodName); }
+
// check if the invocation is for callback handling
if(InternalInvocation.ADDLISTENER.equals(methodName))
{
if(handler == null)
{
- throw new InvalidConfigurationException("Can not accept a callback listener since there are no " +
- "ServerInvocationHandlers registered. Please add via xml configuration " +
- "or via the Connector's addInvocationHandler() method.");
+ throw new InvalidConfigurationException(
+ "Can not accept a callback listener since there are no ServerInvocationHandlers " +
+ "registered. Please add via xml configuration or via the Connector's " +
+ "addInvocationHandler() method.");
}
InvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
@@ -1213,85 +1211,87 @@
{
if(handler == null)
{
- throw new InvalidConfigurationException("Can not remove a callback listener since there are no " +
- "ServerInvocationHandlers registered. Please add via xml configuration " +
- "or via the Connector's addInvocationHandler() method.");
+ throw new InvalidConfigurationException(
+ "Can not remove a callback listener since there are no ServerInvocationHandlers " +
+ "registered. Please add via xml configuration or via the Connector's " +
+ "addInvocationHandler() method.");
}
handler.removeListener(callbackHandler);
- if(trace)
- {
- log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + ".");
- }
+
+ if(trace) { log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + "."); }
callbackHandler.destroy();
}
else
{
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
- throw new RuntimeException("Can not remove callback listener from target server with id of " + sessionId + " as it does not exist as a registered callback listener.");
+ throw new RuntimeException("Can not remove callback listener from target server with " +
+ "id of " + sessionId + " as it does not exist as a registered callback listener.");
}
}
else if(InternalInvocation.GETCALLBACKS.equals(methodName))
{
ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
- if(trace)
- {
- log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + ".");
- }
-
+ if(trace) { log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + "."); }
result = callbackHandler.getCallbacks();
}
else if(InternalInvocation.ACKNOWLEDGECALLBACK.equals(methodName))
{
ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
- if(trace)
- {
- log.trace("ServerInvoker (" + this + ") acknowledge callback on callback handler " + callbackHandler + ".");
- }
-
+ if(trace) { log.trace("ServerInvoker (" + this + ") acknowledge callback on callback handler " + callbackHandler + "."); }
callbackHandler.acknowledgeCallbacks(param);
}
else if(InternalInvocation.ADDCLIENTLISTENER.equals(methodName))
{
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
Object[] params = param.getParameters();
+
// the only elements should be the callback handler and possibly the callback handle object
if(params == null || params.length < 0 || params.length > 3)
{
log.error("Recieved addClientListener InternalInvocation, but getParameters() " +
"returned: " + params);
- throw new RuntimeException("InvokerCallbackHandler and callback handle object (optional) must be supplied as the only " +
- "parameter objects within the InternalInvocation when " +
- "calling addClientListener.");
+ throw new RuntimeException(
+ "InvokerCallbackHandler and callback handle object (optional) must be supplied as " +
+ "the only parameter objects within the InternalInvocation when calling " +
+ "addClientListener.");
}
- InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler) params[0];
+
+ InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler)params[0];
Object callbackHandleObject = params[1];
- CallbackContainer callbackContainer = new CallbackContainer(callbackHandler, callbackHandleObject);
+ CallbackContainer callbackContainer =
+ new CallbackContainer(callbackHandler, callbackHandleObject);
clientCallbackListener.put(sessionId, callbackContainer);
- log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler + " with session id of " + sessionId +
- " and callback handle object of " + callbackHandleObject + ".");
+ log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler +
+ " with session id of " + sessionId + " and callback handle object of " +
+ callbackHandleObject + ".");
}
else if(InternalInvocation.REMOVECLIENTLISTENER.equals(methodName))
{
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
- log.debug("ServerInvoker (" + this + ") removing client callback handler with session id of " + sessionId + ".");
+
+ log.debug("ServerInvoker (" + this + ") removing client callback handler with session " +
+ "id of " + sessionId + ".");
+
Object cbo = clientCallbackListener.remove(sessionId);
if(cbo == null)
{
- throw new RuntimeException("Can not remove callback listener from callback server with id of " + sessionId + " as it does not exist as a registered callback listener.");
+ throw new RuntimeException(
+ "Can not remove callback listener from callback server with id of " + sessionId +
+ " as it does not exist as a registered callback listener.");
}
}
else if(InternalInvocation.HANDLECALLBACK.equals(methodName))
{
String sessionId = ServerInvokerCallbackHandler.getId(invocation);
- if(trace)
- {
- log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + ".");
- }
- CallbackContainer callbackContainer = (CallbackContainer) clientCallbackListener.get(sessionId);
+ if(trace) { log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + "."); }
+
+ CallbackContainer callbackContainer =
+ (CallbackContainer) clientCallbackListener.get(sessionId);
+
if(callbackContainer != null && callbackContainer.getCallbackHandler() != null)
{
Object[] params = param.getParameters();
@@ -1301,7 +1301,8 @@
{
callbackHandleObject = new HashMap();
}
- callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY, callbackContainer.getCallbackHandleObject());
+ callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY,
+ callbackContainer.getCallbackHandleObject());
callbackRequest.setReturnPayload(callbackHandleObject);
InvokerCallbackHandler callbackHandler = callbackContainer.getCallbackHandler();
callbackHandler.handleCallback(callbackRequest);
@@ -1311,227 +1312,244 @@
log.error("Could not find callback handler to call upon for handleCallback " +
"where session id equals " + sessionId);
}
-
-
}
else if(InternalInvocation.ADDSTREAMCALLBACK.equals(methodName))
{
StreamHandler streamHandler = getStreamHandler(invocation);
if(handler instanceof StreamInvocationHandler)
{
- InternalInvocation inv = (InternalInvocation) invocation.getParameter();
+ InternalInvocation inv = (InternalInvocation)invocation.getParameter();
// second parameter should be the param payload
- result = ((StreamInvocationHandler) handler).handleStream(streamHandler, (InvocationRequest)inv.getParameters()[1]);
+ result = ((StreamInvocationHandler)handler).
+ handleStream(streamHandler, (InvocationRequest)inv.getParameters()[1]);
}
else
{
log.error("Client request is an InputStream, but the registered handlers do not " +
"implement the StreamInvocationHandler interface, so could not process call.");
- throw new RuntimeException("No handler registered of proper type (StreamInvocationHandler).");
+ throw new RuntimeException(
+ "No handler registered of proper type (StreamInvocationHandler).");
}
}
else
{
log.error("Error processing InternalInvocation. Unable to process method " +
methodName + ". Please make sure this should be an InternalInvocation.");
- throw new RuntimeException("Error processing InternalInvocation. Unable to process method " +
- methodName);
+ throw new RuntimeException(
+ "Error processing InternalInvocation. Unable to process method " + methodName);
}
return result;
}
- private StreamHandler getStreamHandler(InvocationRequest invocation) throws Exception
+ /**
+ * Called prior to an invocation.
+ * TODO is sending in the arg appropriate?
+ */
+ protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
{
- InternalInvocation inv = (InternalInvocation) invocation.getParameter();
- String locator = (String) inv.getParameters()[0];
- StreamHandler streamHandler = new StreamHandler(locator);
- //StreamHandler streamHandler = StreamHandler.createStreamHandler(locator);
- return streamHandler;
}
- private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation) throws Exception
+ /**
+ * Called after an invocation.
+ * TODO is sending in the arg appropriate?
+ */
+ protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
{
- ServerInvokerCallbackHandler callbackHandler = null;
- String id = ServerInvokerCallbackHandler.getId(invocation);
- synchronized(callbackHandlers)
+ }
+
+ // Private --------------------------------------------------------------------------------------
+
+ private ThreadPool createThreadPoolProxy(ObjectName objName)
{
- callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.get(id);
- // if does not exist, create it
- if(callbackHandler == null)
+ ThreadPool pool;
+ MBeanServer server = getMBeanServer();
+ if(server != null)
{
- callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
- callbackHandlers.put(id, callbackHandler);
- }
+ ThreadPoolMBean poolMBean = (ThreadPoolMBean)MBeanServerInvocationHandler.
+ newProxyInstance(server, objName, ThreadPoolMBean.class, false);
+
+ pool = poolMBean.getInstance();
}
- if(trace)
+ else
{
- log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + ".");
+ throw new RuntimeException("Can not register MBean ThreadPool as the ServerInvoker " +
+ "has not been registered with a MBeanServer.");
}
-
- return callbackHandler;
+ return pool;
}
- private ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
+ //TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
+ private String getDataType(InvokerLocator locator)
{
- String id = ServerInvokerCallbackHandler.getId(invocation);
- ServerInvokerCallbackHandler callbackHandler = null;
+ String type = null;
- synchronized(callbackHandlers)
+ if(locator != null)
{
- callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
+ Map params = locator.getParameters();
+ if(params != null)
+ {
+ type = (String) params.get(InvokerLocator.DATATYPE);
}
- return callbackHandler;
}
-
-
- /**
- * called prior to an invocation
- *
- * @param sessionId
- * @param payload
- * @param locator
- * @todo is sending in the arg appropriate???
- */
- protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
- {
+ return type;
}
- /**
- * called after an invocation
- *
- * @param sessionId
- * @param payload
- * @param locator
- * @todo is sending in the arg appropriate???
- */
- protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
+ private void terminateLease(InvocationRequest invocation)
{
- }
-
- public void create()
+ if (invocation != null)
{
- if(!created)
+ String clientSessionId = invocation.getSessionId();
+ Lease clientLease = (Lease)clientLeases.get(clientSessionId);
+
+ if (clientLease != null)
{
- try
+ boolean clientOnlyTerminated = false;
+ // now have to determine if is just Client that disconnected
+ // or if all Clients disconnected, thus the client invoker
+ // is also disconnected as well.
+ Map reqMap = invocation.getRequestPayload();
+ if (reqMap != null)
{
- setup();
- }
- catch(Exception e)
+ Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
+ if (holderObj != null && holderObj instanceof ClientHolder)
{
- throw new RuntimeException("Error setting up server invoker " + this, e);
- }
- created = true;
+ // just a client that disconnected, so only need to terminate lease for
+ // that particular client (by client session id).
+ ClientHolder holder = (ClientHolder) holderObj;
+ clientLease.terminateLease(holder.getSessionId());
+ clientOnlyTerminated = true;
}
}
- /**
- * subclasses should override to provide any specific start logic
- *
- * @throws IOException
- */
- public void start() throws IOException
+ // now see if client invoker needs to be terminated
+ if (!clientOnlyTerminated)
{
- started = true;
- log.debug(this + " started for locator " + getLocator());
+ clientLease.terminateLease(clientSessionId);
+ clientLeases.remove(clientSessionId);
}
-
- /**
- * return true if the server invoker is started, false if not
- */
- public boolean isStarted()
+ }
+ else
{
- return started;
+ log.warn("Asked to terminate lease for client session id " + clientSessionId +
+ ", but lease for this id could not be found.");
+ }
+ }
}
- /**
- * subclasses should override to provide any specific stop logic
- */
- public void stop()
+ private void updateClientLease(InvocationRequest invocation)
{
- started = false;
+ if(invocation != null)
+ {
+ String clientSessionId = invocation.getSessionId();
+ if(clientSessionId != null)
+ {
+ if(trace) { log.trace("Getting lease for client session id: " + clientSessionId); }
- Iterator it = callbackHandlers.values().iterator();
- while (it.hasNext())
+ Lease clientLease = (Lease)clientLeases.get(clientSessionId);
+ if(clientLease == null)
{
- ServerInvokerCallbackHandler callbackHandler = (ServerInvokerCallbackHandler) it.next();
- callbackHandler.destroy();
+ Lease newClientLease = new Lease(clientSessionId, leasePeriod,
+ locator.getLocatorURI(),
+ invocation.getRequestPayload(),
+ connectionNotifier,
+ clientLeases);
+
+ clientLeases.put(clientSessionId, newClientLease);
+ newClientLease.startLease();
+
+ if(trace) { log.trace("No lease established for client session id (" + clientSessionId + "), so starting a new one."); }
}
+ else
+ {
+ // including request payload from invocation as may contain updated list of clients.
+ clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
- log.debug(this + " stopped");
+ if(trace) { log.trace("Updated lease for client session id (" + clientSessionId + ")"); }
+ }
+ }
+ }
}
/**
- * destory the invoker permanently
+ * Takes the real invocation from the client out of the OnewayInvocation and then executes the
+ * invoke() with the real invocation on a seperate thread.
*/
- public void destroy()
+ private void handleOnewayInvocation(OnewayInvocation onewayInvocation,
+ InvocationRequest invocation) throws Throwable
{
- if(classbyteloader != null)
+ Object[] objs = onewayInvocation.getParameters();
+
+ // The oneway invocation should contain the real param as it's only param in parameter array
+ Object realParam = objs[0];
+ invocation.setParameter(realParam);
+ final InvocationRequest newInvocation = invocation;
+
+ ThreadPool executor = getOnewayThreadPool();
+ Runnable onewayRun = new Runnable()
{
- classbyteloader.destroy();
- }
+ public void run()
+ {
+ try
+ {
+ invoke(newInvocation);
}
-
- /**
- * Sets the server invoker's transport specific configuration. Will need to set before calling
- * start() method (or at least stop() and start() again) before configurations will take affect.
- *
- * @param configuration
- */
- public void setConfiguration(Map configuration)
+ catch(Throwable e)
{
- this.configuration = configuration;
+ // throw away exception since can't get it back to original caller
+ log.error("Error executing server oneway invocation request: " + newInvocation, e);
+ }
+ }
+ };
+ executor.run(onewayRun);
}
- /**
- * Gets the server invoker's transport specific configuration.
- */
- public Map getConfiguration()
+ private StreamHandler getStreamHandler(InvocationRequest invocation) throws Exception
{
- return configuration;
+ InternalInvocation inv = (InternalInvocation)invocation.getParameter();
+ String locator = (String)inv.getParameters()[0];
+ return new StreamHandler(locator);
}
- /**
- * Returns the String for the object name to be used for the invoker.
- */
- public String getMBeanObjectName()
+ private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation)
+ throws Exception
{
- InvokerLocator locator = getLocator();
- StringBuffer buffer = new StringBuffer("jboss.remoting:service=invoker,transport= " + locator.getProtocol());
- buffer.append(",host=" + locator.getHost());
- buffer.append(",port=" + locator.getPort());
- Map param = locator.getParameters();
- if(param != null)
+ ServerInvokerCallbackHandler callbackHandler = null;
+ String id = ServerInvokerCallbackHandler.getId(invocation);
+ synchronized(callbackHandlers)
{
- Iterator itr = param.keySet().iterator();
- while(itr.hasNext())
+ callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.get(id);
+ // if does not exist, create it
+ if(callbackHandler == null)
{
- buffer.append(",");
- String key = (String) itr.next();
- String value = (String) param.get(key);
- buffer.append(key);
- buffer.append("=");
- buffer.append(value);
+ callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
+ callbackHandlers.put(id, callbackHandler);
}
}
- return buffer.toString();
+
+ if(trace) { log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + "."); }
+ return callbackHandler;
}
- public void removeCallbackListener(String subsystem, InvokerCallbackHandler callbackHandler)
+ private ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
{
- ServerInvocationHandler handler = null;
- if(subsystem != null)
+ String id = ServerInvokerCallbackHandler.getId(invocation);
+ ServerInvokerCallbackHandler callbackHandler = null;
+
+ synchronized(callbackHandlers)
{
- handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
+ callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
}
- else
+ return callbackHandler;
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ public static class InvalidStateException extends Exception
{
- // subsystem not specified, so will hope for a default one being set
- if(!handlers.isEmpty())
+ public InvalidStateException(String msg)
{
- handler = (ServerInvocationHandler) handlers.values().iterator().next();
- }
+ super(msg);
}
- handler.removeListener(callbackHandler);
}
private class CallbackContainer
@@ -1556,12 +1574,4 @@
}
}
- public static class InvalidStateException extends Exception
- {
- public InvalidStateException(String msg)
- {
- super(msg);
- }
-
- }
}
More information about the jboss-cvs-commits
mailing list