[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting ...
Ovidiu Feodorov
ovidiu.feodorov at jboss.com
Sat Jan 20 19:35:27 EST 2007
User: ovidiu
Date: 07/01/20 19:35:27
Modified: src/main/org/jboss/remoting Tag: remoting_2_x Client.java
Log:
more Client.java reformatting. Preparing http://jira.jboss.org/jira/browse/JBREM-679
Revision Changes Path
No revision
No revision
1.53.2.15 +554 -572 JBossRemoting/src/main/org/jboss/remoting/Client.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: Client.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/Client.java,v
retrieving revision 1.53.2.14
retrieving revision 1.53.2.15
diff -u -b -r1.53.2.14 -r1.53.2.15
--- Client.java 20 Jan 2007 23:46:02 -0000 1.53.2.14
+++ Client.java 21 Jan 2007 00:35:27 -0000 1.53.2.15
@@ -57,16 +57,19 @@
import java.util.Map;
/**
- * Client is a convience class for invoking remote methods for a given subsystem.
- * It is intended to be the main user interface for making remote invocation
- * on the client side.
+ * Client is a convience class for invoking remote methods for a given subsystem. It is intended to
+ * be the main user interface for making remote invocation on the client side.
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.53.2.14 $
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
+ * @version $Revision: 1.53.2.15 $
*/
public class Client implements Externalizable
{
+ // Constants ------------------------------------------------------------------------------------
+
/**
* Key to be used to determine if invocation is to be
* oneway (async).
@@ -114,7 +117,8 @@
/**
* Key for the configuration when adding a callback handler and internal callback server
* connector is created. The value should be the transport protocol to be used. By default
- * will use the same protocol as being used by this client (e.g. http, socket, rmi, multiplex, etc.)
+ * will use the same protocol as being used by this client (e.g. http, socket, rmi, multiplex,
+ * etc.)
*/
public static final String CALLBACK_SERVER_PROTOCOL = "callbackServerProtocol";
@@ -146,13 +150,19 @@
*/
public static final String MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE = "maxOnewayThreadPoolQueueSize";
+ private static final Logger log = Logger.getLogger(Client.class);
+
+ private static final long serialVersionUID = 5679279425009837934L;
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
/**
* Indicated the max number of threads used within oneway thread pool.
*/
private int maxNumberThreads = MAX_NUM_ONEWAY_THREADS_DEFAULT;
private int maxOnewayThreadPoolQueueSize = -1;
- private static final Logger log = Logger.getLogger(Client.class);
private ClientInvoker invoker;
private ClassLoader classloader;
private String subsystem;
@@ -173,15 +183,21 @@
private SocketFactory socketFactory;
- private static final long serialVersionUID = 5679279425009837934L;
+ // Constructors ---------------------------------------------------------------------------------
/**
- * Constructs a remoting client with intended target server specified via the lcoator,
- * without specifing a remote subsystem or including any metadata.
- * Same as calling Client(locator, null, null);
- *
- * @param locator
- * @throws Exception
+ * PLEASE DO NOT USE THIS CONSTRUCTOR OR YOUR COMPUTER WILL BURST INTO FLAMES!!!
+ * It is only here so can externalize object and will provide a dead object if invoker is not
+ * explicitly set. Please use other contructors provided.
+ */
+ public Client()
+ {
+ }
+
+ /**
+ * Constructs a remoting client with intended target server specified via the locator, without
+ * specifing a remote subsystem or including any metadata. Same as calling Client(locator, null,
+ * null).
*/
public Client(InvokerLocator locator) throws Exception
{
@@ -189,15 +205,11 @@
}
/**
- * Constructs a remoting client with intended target server specified via the locator
- * and configuration metadata. The metadata supplied will be used when creating client
- * invoker (in the case specific data is required) and also for passing along additional
- * data to connection listeners on the server side in the case that the client fails, will
- * be able to use this extra information when notified.
- *
- * @param locator
- * @param configuration
- * @throws Exception
+ * Constructs a remoting client with intended target server specified via the locator and
+ * configuration metadata. The metadata supplied will be used when creating client invoker (in
+ * the case specific data is required) and also for passing along additional data to connection
+ * listeners on the server side in the case that the client fails, will be able to use this extra
+ * information when notified.
*/
public Client(InvokerLocator locator, Map configuration) throws Exception
{
@@ -205,54 +217,38 @@
}
/**
- * Constructs a remoting client with intended target server specified via the locator
- * and intended subsystem on server for invocations to be routed to.
- *
- * @param locator
- * @param subsystem
- * @throws Exception
+ * Constructs a remoting client with intended target server specified via the locator and
+ * intended subsystem on server for invocations to be routed to.
*/
- public Client(InvokerLocator locator, String subsystem)
- throws Exception
+ public Client(InvokerLocator locator, String subsystem) throws Exception
{
this(locator, subsystem, null);
}
/**
- * Constructs a remoting client with intended target server specified via the locator, intended subsystem
- * on the server for invocations to be routed to, and configuration metadata.
- * The metadata supplied will be used when creating client
- * invoker (in the case specific data is required) and also for passing along additional
- * data to connection listeners on the server side in the case that the client fails, will
- * be able to use this extra information when notified.
- *
- * @param locator
- * @param subsystem
- * @param configuration
- * @throws Exception
+ * Constructs a remoting client with intended target server specified via the locator, intended
+ * subsystem on the server for invocations to be routed to, and configuration metadata. The
+ * metadata supplied will be used when creating client invoker (in the case specific data is
+ * required) and also for passing along additional data to connection listeners on the server
+ * side in the case that the client fails, will be able to use this extra information when
+ * notified.
*/
- public Client(InvokerLocator locator, String subsystem, Map configuration)
- throws Exception
+ public Client(InvokerLocator locator, String subsystem, Map configuration) throws Exception
{
this(Thread.currentThread().getContextClassLoader(), locator, subsystem, configuration);
}
/**
- * Constructs a remoting client with intended target server specified via the locator, intended subsystem
- * on the server for invocations to be routed to, and configuration metadata.
- * The metadata supplied will be used when creating client
- * invoker (in the case specific data is required) and also for passing along additional
- * data to connection listeners on the server side in the case that the client fails, will
- * be able to use this extra information when notified (which will happen when connect() method
- * is called.
- *
- * @param cl - the classloader that should be used by remoting
- * @param locator
- * @param subsystem
- * @param configuration
- * @throws Exception
- * @deprecated This constructor should not be used any more as will no longer take into
- * account the classloader specified as a parameter.
+ * Constructs a remoting client with intended target server specified via the locator, intended
+ * subsystem on the server for invocations to be routed to, and configuration metadata. The
+ * metadata supplied will be used when creating client invoker (in the case specific data is
+ * required) and also for passing along additional data to connection listeners on the server
+ * side in the case that the client fails, will be able to use this extra information when
+ * notified (which will happen when connect() method is called.
+ *
+ * @param cl - the classloader that should be used by remoting.
+ * @deprecated This constructor should not be used any more as will no longer take into account
+ * the classloader specified as a parameter.
*/
public Client(ClassLoader cl, InvokerLocator locator, String subsystem, Map configuration)
throws Exception
@@ -261,23 +257,20 @@
this.locator = locator;
this.subsystem = subsystem == null ? null : subsystem.toUpperCase();
if (configuration != null)
+ {
this.configuration = new HashMap(configuration);
+ }
this.sessionId = new GUID().toString();
}
/**
- * Constructs a remoting client with intended target server specified via the locator
- * and intended subsystem on server for invocations to be routed to.
+ * Constructs a remoting client with intended target server specified via the locator and
+ * intended subsystem on server for invocations to be routed to.
*
- * @param cl
- * @param invoker
- * @param subsystem
- * @throws Exception
- * @deprecated This constructor should not be used any more as will no longer take into
- * account the classloader specified as a parameter.
+ * @deprecated This constructor should not be used any more as will no longer take into account
+ * the classloader specified as a parameter.
*/
- public Client(ClassLoader cl, ClientInvoker invoker, String subsystem)
- throws Exception
+ public Client(ClassLoader cl, ClientInvoker invoker, String subsystem) throws Exception
{
this.classloader = cl;
this.subsystem = subsystem == null ? null : subsystem.toUpperCase();
@@ -285,6 +278,55 @@
this.sessionId = new GUID().toString();
}
+ // Externalizable implementation ----------------------------------------------------------------
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ int version = in.readInt();
+
+ switch (version)
+ {
+ case Version.VERSION_2:
+ {
+ InvokerLocator readLocator = (InvokerLocator) in.readObject();
+ this.subsystem = (String) in.readObject();
+ this.configuration = (Map) in.readObject();
+ boolean wasConnected = in.readBoolean();
+
+ this.classloader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ this.invoker = InvokerRegistry.createClientInvoker(readLocator, configuration);
+ if(wasConnected)
+ {
+ connect();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error(e);
+ throw new IOException(e.getMessage());
+ }
+
+ break;
+ }
+ default:
+ throw new StreamCorruptedException("Unkown version seen: " + version);
+ }
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ out.writeInt(Version.getDefaultVersion());
+ out.writeObject(invoker != null ? invoker.getLocator() : locator);
+ out.writeObject(subsystem);
+ out.writeObject(configuration);
+ out.writeBoolean(isConnected());
+ out.flush();
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
/**
* Adds a connection listener that will be notified if/when the connection to the server fails
* while the client is idle (no calls being made). The default behavior is to ping for connection
@@ -337,12 +379,9 @@
}
/**
- * This will set the session id used when making invocations on
- * server invokers. There is a default unique id automatically
- * generated for each Client instance, so unless you have a good reason to set
- * this, do not set this.
- *
- * @param sessionId
+ * This will set the session id used when making invocations on server invokers. There is a
+ * default unique id automatically generated for each Client instance, so unless you have a good
+ * reason to set this, do not set this.
*/
public void setSessionId(String sessionId)
{
@@ -350,10 +389,7 @@
}
/**
- * Gets the configuration map passed when constructing
- * this object.
- *
- * @return
+ * Gets the configuration map passed when constructing this object.
*/
public Map getConfiguration()
{
@@ -361,12 +397,9 @@
}
/**
- * Gets the session id used when making invocations on server invokers.
- * This is the id that will be used for tracking client connections on
- * the server side, to include client failures that are sent to
- * connection listeners on the server side.
- *
- * @return
+ * Gets the session id used when making invocations on server invokers. This is the id that will
+ * be used for tracking client connections on the server side, to include client failures that
+ * are sent to connection listeners on the server side.
*/
public String getSessionId()
{
@@ -374,10 +407,7 @@
}
/**
- * Indicates if the underlying transport has been connected to
- * the target server.
- *
- * @return
+ * Indicates if the underlying transport has been connected to the target server.
*/
public boolean isConnected()
{
@@ -385,11 +415,9 @@
}
/**
- * Will cause the underlying transport to make connection to
- * the target server. This is important for any stateful transports, like socket or multiplex.
- * This is also when a client lease with the server is started.
- *
- * @throws Exception
+ * Will cause the underlying transport to make connection to the target server. This is
+ * important for any stateful transports, like socket or multiplex. This is also when a client
+ * lease with the server is started.
*/
public void connect() throws Exception
{
@@ -411,117 +439,15 @@
}
invoker = InvokerRegistry.createClientInvoker(locator, configuration);
}
- connect(invoker);
- }
-
- private void connect(ClientInvoker invoker)
- {
- if (invoker != null)
- {
- invoker.connect();
- try
- {
- setupClientLease(invoker);
- }
- catch (Throwable throwable)
- {
- RuntimeException e = new RuntimeException("Error setting up client lease upon performing connect.");
- e.initCause(throwable);
- throw e;
- }
- }
- else
- {
- throw new RuntimeException("Client invoker is null (may have used void constructor for Client, which should only be used for Externalization.");
- }
- }
-
- private void setupClientLease(ClientInvoker invoker) throws Throwable
- {
-
- // start with checking the locator url for hint as to if should do initial lease ping
- if (invoker != null)
- {
- if (invoker instanceof LocalClientInvoker)
- {
- // no need to continue as won't do client lease when is local
- // JBREM-382
- return;
- }
-
- InvokerLocator locator = invoker.getLocator();
- Map locatorParams = locator.getParameters();
- if (locatorParams != null)
- {
- String leaseValue = (String) locatorParams.get(InvokerLocator.CLIENT_LEASE);
- if (leaseValue != null && leaseValue.length() > 0)
- {
- enableLease = Boolean.valueOf(leaseValue).booleanValue();
- }
- String leasePeriodValue = (String) locatorParams.get(InvokerLocator.CLIENT_LEASE_PERIOD);
- if (leasePeriodValue != null && leasePeriodValue.length() > 0)
- {
- try
- {
- leasePeriod = Long.parseLong(leasePeriodValue);
- }
- catch (NumberFormatException e)
- {
- log.warn("Could not convert client lease period value (" + leasePeriodValue + ") to a number.");
- }
- }
- }
- }
- else
- {
- throw new RuntimeException("Can not set up client lease as client invoker is null.");
- }
-
- if (configuration != null)
- {
- Object val = configuration.get(ENABLE_LEASE);
- if (val != null)
- {
- if (val instanceof Boolean)
- {
- enableLease = ((Boolean) val).booleanValue();
- }
- else if (val instanceof String)
- {
- enableLease = Boolean.valueOf((String) val).booleanValue();
- }
- else
- {
- log.warn("Can not evaluate " + ENABLE_LEASE + " value (" + val + ") as a boolean type.");
- }
- }
- String leasePeriodValue = (String) configuration.get(InvokerLocator.CLIENT_LEASE_PERIOD);
- if (leasePeriodValue != null && leasePeriodValue.length() > 0)
- {
- try
- {
- leasePeriod = Long.parseLong(leasePeriodValue);
- }
- catch (NumberFormatException e)
- {
- log.warn("Could not convert client lease period value (" + leasePeriodValue + ") to a number.");
- }
- }
-
- }
- if (enableLease)
- {
- invoker.establishLease(sessionId, configuration, leasePeriod);
- }
+ connect(invoker);
}
/**
- * Disconnects the underlying transport from the target server.
- * Also notifies the target server to terminate client lease. Is important
- * that this method is called when no longer using the remoting client. Otherwise
- * resource will not be cleaned up and if the target server requires a lease, it
- * will be maintained in the background.
+ * Disconnects the underlying transport from the target server. Also notifies the target server
+ * to terminate client lease. Is important that this method is called when no longer using the
+ * remoting client. Otherwise resource will not be cleaned up and if the target server requires
+ * a lease, it will be maintained in the background.
*/
public void disconnect()
{
@@ -540,9 +466,8 @@
}
/**
- * Need to remove myself from registry so will not keep
- * reference to me since I am of no use now. Will have to create
- * a new one.
+ * Need to remove myself from registry so will not keep reference to me since I am of no
+ * use now. Will have to create a new one.
*/
InvokerRegistry.destroyClientInvoker(invoker.getLocator(), configuration);
invoker = null;
@@ -593,53 +518,35 @@
}
/**
- * invoke the method remotely
+ * Invoke the method remotely.
*
- * @param param - payload for the server invoker handler
- * @param metadata - any extra metadata that may be needed by the transport (i.e. GET or POST if using
- * http invoker) or if need to pass along extra data to the server invoker handler.
- * @return
- * @throws Throwable
+ * @param param - payload for the server invoker handler.
+ * @param metadata - any extra metadata that may be needed by the transport (i.e. GET or POST if
+ * using http invoker) or if need to pass along extra data to the server invoker handler.
*/
- public Object invoke(Object param, Map metadata)
- throws Throwable
+ public Object invoke(Object param, Map metadata) throws Throwable
{
return invoke(param, metadata, null);
}
- private Object invoke(Object param, Map metadata, InvokerLocator callbackServerLocator)
- throws Throwable
- {
- if (isConnected())
- {
- return invoker.invoke(new InvocationRequest(sessionId, subsystem, param, metadata, null, callbackServerLocator));
- }
- else
- {
- throw new Exception("Can not make remoting client invocation due to not being connected to server.");
- }
- }
-
/**
- * Will invoke a oneway call to server without a return object. This should be used when not expecting a
- * return value from the server and wish to achieve higher performance, since the client will not wait for
- * a return.
+ * Will invoke a oneway call to server without a return object. This should be used when not
+ * expecting a return value from the server and wish to achieve higher performance, since the
+ * client will not wait for a return.
* <b>
- * This is done one of two ways. The first is to pass true as the clientSide param. This will cause the
- * execution of the remote call to be excuted in a new thread on the client side and will return the calling thread
- * before making call to server side. Although, this is optimal for performance, will not know about any problems
- * contacting server.
+ * This is done one of two ways. The first is to pass true as the clientSide param. This will
+ * cause the execution of the remote call to be excuted in a new thread on the client side and
+ * will return the calling thread before making call to server side. Although, this is optimal
+ * for performance, will not know about any problems contacting server.
* <p/>
- * The second, is to pass false as the clientSide param. This will allow the current calling thread to make
- * the call to the remote server, at which point, the server side processing of the thread will be executed on
- * the remote server in a new executing thread and the client thread will return. This is a little slower, but
- * will know that the call made it to the server.
- *
- * @param param
- * @param sendPayload
- * @param clientSide
+ * The second, is to pass false as the clientSide param. This will allow the current calling
+ * thread to make the call to the remote server, at which point, the server side processing of
+ * the thread will be executed on the remote server in a new executing thread and the client
+ * thread will return. This is a little slower, but will know that the call made it to the
+ * server.
*/
- public void invokeOneway(final Object param, final Map sendPayload, boolean clientSide) throws Throwable
+ public void invokeOneway(final Object param, final Map sendPayload, boolean clientSide)
+ throws Throwable
{
final Map internalSendPayload = sendPayload == null ? new HashMap() : sendPayload;
internalSendPayload.put(ONEWAY_FLAG, "true");
@@ -672,12 +579,9 @@
}
/**
- * Sets the maximum queue size to use within client pool for
- * one way invocations on the client side (meaning oneway invocation
- * is handled by thread in this pool and user's call returns immediately)
- * Default value is MAX_NUM_ONEWAY_THREADS.
- *
- * @param maxOnewayThreadPoolQueueSize
+ * Sets the maximum queue size to use within client pool for one way invocations on the client
+ * side (meaning oneway invocation is handled by thread in this pool and user's call returns
+ * immediately). Default value is MAX_NUM_ONEWAY_THREADS.
*/
public void setMaxOnewayThreadPoolQueueSize(int maxOnewayThreadPoolQueueSize)
{
@@ -685,12 +589,9 @@
}
/**
- * Gets the maximum queue size to use within client pool for
- * one way invocations on the client side (meaning oneway invocation
- * is handled by thread in this pool and user's call returns immediately)
- * Default value is MAX_NUM_ONEWAY_THREADS.
- *
- * @return
+ * Gets the maximum queue size to use within client pool for one way invocations on the client
+ * side (meaning oneway invocation is handled by thread in this pool and user's call returns
+ * immediately). Default value is MAX_NUM_ONEWAY_THREADS.
*/
public int getMaxOnewayThreadPoolQueueSize()
{
@@ -698,12 +599,9 @@
}
/**
- * Sets the maximum number of threads to use within client pool for
- * one way invocations on the client side (meaning oneway invocation
- * is handled by thread in this pool and user's call returns immediately)
- * Default value is MAX_NUM_ONEWAY_THREADS.
- *
- * @param numOfThreads
+ * Sets the maximum number of threads to use within client pool for one way invocations on the
+ * client side (meaning oneway invocation is handled by thread in this pool and user's call
+ * returns immediately). Default value is MAX_NUM_ONEWAY_THREADS.
*/
public void setMaxNumberOfThreads(int numOfThreads)
{
@@ -711,12 +609,9 @@
}
/**
- * Gets the maximum number of threads to use within client pool for
- * one way invocations on the client side (meaning oneway invocation
- * is handled by thread in this pool and user's call returns immediately)
- * Default value is MAX_NUM_ONEWAY_THREADS.
- *
- * @return
+ * Gets the maximum number of threads to use within client pool for one way invocations on the
+ * client side (meaning oneway invocation is handled by thread in this pool and user's call
+ * returns immediately). Default value is MAX_NUM_ONEWAY_THREADS.
*/
public int getMaxNumberOfThreads()
{
@@ -724,13 +619,9 @@
}
/**
- * Gets the thread pool being used for making
- * one way invocations on the client side.
- * If one has not be specifically set via configuration
- * or call to set it, will always return instance of
+ * Gets the thread pool being used for making one way invocations on the client side. If one has
+ * not be specifically set via configuration or call to set it, will always return instance of
* org.jboss.util.threadpool.BasicThreadPool.
- *
- * @return
*/
public ThreadPool getOnewayThreadPool()
{
@@ -754,7 +645,9 @@
{
log.error("maxNumberThreads parameter must be in integer format: " + param);
}
+
param = configuration.get(MAX_ONEWAY_THREAD_POOL_QUEUE_SIZE);
+
if (param instanceof String)
{
try
@@ -772,8 +665,11 @@
}
pool.setMaximumPoolSize(maxNumberThreads);
+
if (maxOnewayThreadPoolQueueSize > 0)
+ {
pool.setMaximumQueueSize(maxOnewayThreadPoolQueueSize);
+ }
pool.setBlockingMode(BlockingMode.WAIT);
onewayThreadPool = pool;
}
@@ -781,10 +677,7 @@
}
/**
- * Sets the thread pool to be used for making
- * one way invocations on the client side.
- *
- * @param pool
+ * Sets the thread pool to be used for making one way invocations on the client side.
*/
public void setOnewayThreadPool(ThreadPool pool)
{
@@ -792,15 +685,15 @@
}
/**
- * The socket factory can only be set on the Client before the connect() method
- * has been called. Otherwise, a runtime exception will be thrown.
- * @param socketFactory
+ * The socket factory can only be set on the Client before the connect() method has been called.
+ * Otherwise, a runtime exception will be thrown.
*/
public void setSocketFactory(SocketFactory socketFactory)
{
if(isConnected())
{
- throw new RuntimeException("Cannot set socket factory on Client after the connect() method has been called.");
+ throw new RuntimeException("Cannot set socket factory on Client after " +
+ "the connect() method has been called.");
}
if (invoker != null)
@@ -827,10 +720,8 @@
/**
* Same as calling invokeOneway(Object param, Map sendPayload, boolean clientSide) with
- * clientSide param being false and a null sendPayload. Therefore, client thread will not return till it has made
- * remote call.
- *
- * @param param
+ * clientSide param being false and a null sendPayload. Therefore, client thread will not return
+ * till it has made remote call.
*/
public void invokeOneway(Object param) throws Throwable
{
@@ -841,28 +732,19 @@
* Same as calling invokeOneway(Object param, Map sendPayload, boolean clientSide) with
* clientSide param being false. Therefore, client thread will not return till it has made
* remote call.
- *
- * @param param
- * @param sendPayload
*/
public void invokeOneway(Object param, Map sendPayload) throws Throwable
{
invokeOneway(param, sendPayload, false);
}
-
/**
- * Adds the specified handler as a callback listener for push (async) callbacks.
- * If the transport is uni-directional (e.g. http), remoting will automatically
- * poll for callbacks from the server and deliver them to the callback handler.
- * If the transport is bi-directional (e.g. multiplex), remoting will automatically
- * create a callback server internally and receive and deliver to callback handler the callbacks as
- * they are generated on the server.
- * The metadata map passed will control configuration for how the callbacks are processed,
- * such as the polling frequency.
- *
- * @param callbackhandler
- * @param metadata
+ * Adds the specified handler as a callback listener for push (async) callbacks. If the transport
+ * is uni-directional (e.g. http), remoting will automatically poll for callbacks from the server
+ * and deliver them to the callback handler. If the transport is bi-directional (e.g. multiplex),
+ * remoting will automatically create a callback server internally and receive and deliver to
+ * callback handler the callbacks as they are generated on the server. The metadata map passed
+ * will control configuration for how the callbacks are processed, such as the polling frequency.
*/
public void addListener(InvokerCallbackHandler callbackhandler, Map metadata) throws Throwable
{
@@ -870,46 +752,37 @@
}
/**
- * Adds the specified handler as a callback listener for push (async) callbacks.
- * If the transport is uni-directional (e.g. http), remoting will automatically
- * poll for callbacks from the server and deliver them to the callback handler.
- * If the transport is bi-directional (e.g. multiplex), remoting will automatically
- * create a callback server internally and receive and deliver to callback handler the callbacks as
- * they are generated on the server.
- * The metadata map passed will control configuration for how the callbacks are processed,
- * such as the polling frequency.
+ * Adds the specified handler as a callback listener for push (async) callbacks. If the transport
+ * is uni-directional (e.g. http), remoting will automatically poll for callbacks from the server
+ * and deliver them to the callback handler. If the transport is bi-directional (e.g. multiplex),
+ * remoting will automatically create a callback server internally and receive and deliver to
+ * callback handler the callbacks as they are generated on the server. The metadata map passed
+ * will control configuration for how the callbacks are processed, such as the polling frequency.
*
- * @param callbackhandler
- * @param metadata
- * @param callbackHandlerObject this object will be included in the Callback object instance passed
- * to the InvokerCallbackHandler specified.
+ * @param callbackHandlerObject - this object will be included in the Callback object instance
+ * passed to the InvokerCallbackHandler specified.
*/
- public void addListener(InvokerCallbackHandler callbackhandler, Map metadata, Object callbackHandlerObject)
- throws Throwable
+ public void addListener(InvokerCallbackHandler callbackhandler, Map metadata,
+ Object callbackHandlerObject) throws Throwable
{
addListener(callbackhandler, metadata, callbackHandlerObject, false);
}
/**
- * Adds the specific handler as a callback listener for async callbacks. If the
- * transport support bi-directional calls (meaning server can call back to client
- * over same connection that was established by the client) or if the serverToClient flag
- * is set to true, a callback server will be created internally and the target server
- * will actually send callbacks to the client's internal server. Otherwise, the client
- * will simulate push callbacks by internally polling for callbacks on the server and then deliver
- * them to the callback handler.
- *
- * @param callbackhandler
- * @param metadata
- * @param callbackHandlerObject
- * @param serverToClient if true, will allow server to connect to the client directly (which must
- * be allowed by firewall in front of client unless transport is bi-directional, such as the multiplex transport).
- * If false (and not bi-directional transport), server will not create any new connection to the client.
- * @throws Throwable
+ * Adds the specific handler as a callback listener for async callbacks. If the transport
+ * supports bi-directional calls (meaning server can call back to client over same connection
+ * that was established by the client) or if the serverToClient flag is set to true, a callback
+ * server will be created internally and the target server will actually send callbacks to the
+ * client's internal server. Otherwise, the client will simulate push callbacks by internally
+ * polling for callbacks on the server and then deliver them to the callback handler.
+ *
+ * @param serverToClient - if true, will allow server to connect to the client directly (which
+ * must be allowed by firewall in front of client unless transport is bi-directional, such
+ * as the multiplex transport). If false (and not bi-directional transport), server will
+ * not create any new connection to the client.
*/
public void addListener(InvokerCallbackHandler callbackhandler, Map metadata,
- Object callbackHandlerObject, boolean serverToClient)
- throws Throwable
+ Object callbackHandlerObject, boolean serverToClient) throws Throwable
{
InvokerLocator callbackLocator = null;
@@ -921,7 +794,7 @@
if (isBidirectional || serverToClient)
{
- //setup callback server
+ // setup callback server
String transport = null;
String host = null;
int port = -1;
@@ -940,7 +813,8 @@
}
catch (NumberFormatException e)
{
- log.warn("Could not set the internal callback server port as configuration value (" + sPort + ") is not a number.");
+ log.warn("Could not set the internal callback server port as " +
+ "configuration value (" + sPort + ") is not a number.");
}
}
}
@@ -966,7 +840,8 @@
if(isBidirectional)
{
- callbackLocator = ((BidirectionalClientInvoker)invoker).getCallbackLocator(metadata);
+ callbackLocator =
+ ((BidirectionalClientInvoker)invoker).getCallbackLocator(metadata);
}
else
{
@@ -981,7 +856,8 @@
else
{
//need to setup poller to get callbacks from the server
- CallbackPoller poller = new CallbackPoller(this, callbackhandler, metadata, callbackHandlerObject);
+ CallbackPoller poller =
+ new CallbackPoller(this, callbackhandler, metadata, callbackHandlerObject);
callbackPollers.put(callbackhandler, poller);
poller.start();
}
@@ -990,157 +866,79 @@
}
else
{
- throw new NullPointerException("InvokerCallbackHandler to be added as a listener can not be null.");
+ throw new NullPointerException("InvokerCallbackHandler to be added as " +
+ "a listener can not be null.");
}
}
else
{
- throw new Exception("Can not add callback listener because remoting client is not connected to server.");
+ throw new Exception("Can not add callback listener because " +
+ "remoting client is not connected to server.");
}
}
- private void addCallbackListener(InvokerCallbackHandler callbackhandler, Map metadata,
- InvokerLocator callbackLocator, Object callbackHandlerObject)
- throws Throwable
+ /**
+ * Adds the specified handler as a callback listener for pull (sync) callbacks. Using this method
+ * will require the programatic getting of callbacks from the server (they will not be pushed to
+ * the callback handler automatically).
+ */
+ public void addListener(InvokerCallbackHandler callbackHandler) throws Throwable
{
- // if callback locator is null, then is pull callbacks and need to track callback handler
- // per Client (not by client invoker).
- if (callbackLocator == null)
+ addListener(callbackHandler, (InvokerLocator) null);
+ }
+
+ /**
+ * Adds the specified handler as a callback listener for push (async) callbacks. The invoker
+ * server will then callback on this handler (via the server invoker specified by the
+ * clientLocator) when it gets a callback from the server handler.
+ *
+ * Note: passing a null clientLocator will cause the client invoker's client locator to be set to
+ * null, which basically converts the mode to be pull (sync) where will require call to get
+ * callbacks (as will not automatically be pushed to callback handler).
+ */
+ public void addListener(InvokerCallbackHandler callbackHandler,
+ InvokerLocator clientLocator) throws Throwable
{
- String listenerId = generateListenerId(callbackhandler);
+ addListener(callbackHandler, clientLocator, null);
+ }
- // if listenerId is null, means this Client has already had the callbackhanler reference
- // registered as a listener, so no need to add it again.
- if (listenerId != null)
+ /**
+ * Adds the specified handler as a callback listener for push (async) callbacks. The invoker
+ * server will then callback on this handler (via the server invoker specified by the
+ * clientLocator) when it gets a callback from the server handler.
+ *
+ * Note: passing a null clientLocator will cause the client invoker's client locator to be set to
+ * null, which basically converts the mode to be pull (sync) where will require call to get
+ * callbacks (as will not automatically be pushed to callback handler).
+ *
+ * @param callbackHandlerObject will be included in the callback object passed upon callback.
+ */
+ public void addListener(InvokerCallbackHandler callbackHandler,
+ InvokerLocator clientLocator, Object callbackHandlerObject)
+ throws Throwable
{
- Map internalMetadata = new HashMap();
- internalMetadata.put(LISTENER_ID_KEY, listenerId);
- if(metadata != null)
+ if (callbackHandler != null)
{
- internalMetadata.putAll(metadata);
+ if (isConnected())
+ {
+ addCallbackListener(callbackHandler, null, clientLocator, callbackHandlerObject);
}
- // now call server to add listener
- invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null), internalMetadata, callbackLocator);
+ else
+ {
+ throw new Exception("Can not add callback listener as " +
+ "remoting client is not connected to server.");
}
}
else
{
- // is going to be push callbacks which means callback server locator involved.
- // will have to delegate to client invoker.
- String listenerId = invoker.addClientLocator(sessionId, callbackhandler, callbackLocator);
-
- if (listenerId != null)
- {
-
- Map internalMetadata = new HashMap();
- internalMetadata.put(LISTENER_ID_KEY, listenerId);
- if(metadata != null)
- {
- internalMetadata.putAll(metadata);
- }
-
- Client client = new Client(callbackLocator, subsystem);
- client.setSessionId(getSessionId());
- client.connect();
-
- try
- {
- client.invoke(new InternalInvocation(InternalInvocation.ADDCLIENTLISTENER,
- new Object[]{callbackhandler, callbackHandlerObject}),
- internalMetadata);
- }
- finally
- {
- client.disconnect();
- }
- // now call server to add listener
- invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null), internalMetadata, callbackLocator);
- }
- }
- }
-
- private String generateListenerId(InvokerCallbackHandler callbackhandler)
- {
- String listenerId = null;
- Object obj = listeners.get(callbackhandler);
- if(obj == null)
- {
- listenerId = new GUID().toString();
- listeners.put(callbackhandler, listenerId);
- }
- return listenerId;
- }
-
- /**
- * Adds the specified handler as a callback listener for pull (sync) callbacks.
- * Using this method will require the programatic getting of callbacks from the server
- * (they will not be pushed to the callback handler automatically).
- *
- * @param callbackHandler
- */
- public void addListener(InvokerCallbackHandler callbackHandler) throws Throwable
- {
- addListener(callbackHandler, (InvokerLocator) null);
- }
-
- /**
- * Adds the specified handler as a callback listener for push (async) callbacks.
- * The invoker server will then callback on this handler (via the server invoker
- * specified by the clientLocator) when it gets a callback from the server handler.
- * Note: passing a null clientLocator will cause the client invoker's client
- * locator to be set to null, which basically converts the mode to be pull (sync) where
- * will require call to get callbacks (as will not automatically be pushed to callback handler).
- *
- * @param callbackHandler
- * @param clientLocator
- * @throws Throwable
- */
- public void addListener(InvokerCallbackHandler callbackHandler,
- InvokerLocator clientLocator) throws Throwable
- {
- addListener(callbackHandler, clientLocator, null);
- }
-
- /**
- * Adds the specified handler as a callback listener for push (async) callbacks.
- * The invoker server will then callback on this handler (via the server invoker
- * specified by the clientLocator) when it gets a callback from the server handler.
- * Note: passing a null clientLocator will cause the client invoker's client
- * locator to be set to null, which basically converts the mode to be pull (sync) where
- * will require call to get callbacks (as will not automatically be pushed to callback handler).
- *
- * @param callbackHandler interface to call on with callback
- * @param clientLocator locator for callback server to callback on
- * @param callbackHandlerObject will be included in the callback object passed upon callback
- * @throws Throwable
- */
- public void addListener(InvokerCallbackHandler callbackHandler,
- InvokerLocator clientLocator, Object callbackHandlerObject) throws Throwable
- {
- if (callbackHandler != null)
- {
- if (isConnected())
- {
- addCallbackListener(callbackHandler, null, clientLocator, callbackHandlerObject);
- }
- else
- {
- throw new Exception("Can not add callback listener as remoting client is not connected to server.");
- }
- }
- else
- {
- throw new NullPointerException("InvokerCallbackHandler to be added as a listener can not be null.");
- }
- }
-
+ throw new NullPointerException("InvokerCallbackHandler to be added as " +
+ "a listener can not be null.");
+ }
+ }
/**
- * Removes callback handler as a callback listener from the server (and client in
- * the case that it was setup to receive async callbacks). See addListener().
- *
- * @param callbackHandler
- * @throws Throwable
+ * Removes callback handler as a callback listener from the server (and client in the case that
+ * it was setup to receive async callbacks). See addListener().
*/
public void removeListener(InvokerCallbackHandler callbackHandler) throws Throwable
{
@@ -1148,8 +946,11 @@
{
if (callbackHandler != null)
{
- // first need to see if is push or pull callback (i.e. does have locator associated with it)
+ // first need to see if is push or pull callback (i.e. does have locator associated
+ // with it)
+
String listenerId = (String)listeners.get(callbackHandler);
+
if(listenerId != null)
{
// have a pull callback handler
@@ -1158,7 +959,9 @@
invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null), metadata);
// clean up callback poller if one exists
- CallbackPoller callbackPoller = (CallbackPoller) callbackPollers.remove(callbackHandler);
+ CallbackPoller callbackPoller =
+ (CallbackPoller)callbackPollers.remove(callbackHandler);
+
if (callbackPoller != null)
{
callbackPoller.stop();
@@ -1175,11 +978,14 @@
{
// have a push callback handler
List holderList = invoker.getClientLocators(sessionId, callbackHandler);
+
if(holderList != null && holderList.size() > 0)
{
for(int x = 0; x < holderList.size(); x++)
{
- AbstractInvoker.CallbackLocatorHolder holder = (AbstractInvoker.CallbackLocatorHolder)holderList.get(x);
+ AbstractInvoker.CallbackLocatorHolder holder =
+ (AbstractInvoker.CallbackLocatorHolder)holderList.get(x);
+
listenerId = holder.getListenerId();
InvokerLocator locator = holder.getLocator();
Map metadata = new HashMap();
@@ -1188,13 +994,17 @@
try
{
// now call target server to remove listener
- invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null), metadata);
+
+ invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null),
+ metadata);
}
catch (Exception e)
{
log.warn("unable to remove remote callback handler: " + e.getMessage());
}
+
// call to callback server to remove listener
+
Client client = new Client(locator, subsystem);
client.setSessionId(getSessionId());
client.connect();
@@ -1210,9 +1020,6 @@
}
}
-
-
-
// Map metadata = createListenerMetadata(callbackHandler);
// String listenerId = (String) metadata.get(LISTENER_ID_KEY);
// // connect to the given client locator and remove handler as listener
@@ -1231,7 +1038,9 @@
// invoke(new InternalInvocation(InternalInvocation.REMOVELISTENER, null), metadata);
// clean up callback server connector if one exists
+
Connector callbackConnector = (Connector) callbackConnectors.remove(callbackHandler);
+
if (callbackConnector != null)
{
callbackConnector.stop();
@@ -1252,33 +1061,34 @@
}
else
{
- throw new Exception("Can not remove callback listener as remoting client is not connected to server.");
+ throw new Exception("Can not remove callback listener as " +
+ "remoting client is not connected to server.");
}
}
/**
- * Gets the callbacks for specified callback handler. The handler is required because an id is generated
- * for each handler. So if have two callback handlers registered with the same server, no other way to know
- * for which handler to get the callbacks for.
- *
- * @param callbackHandler
- * @return
- * @throws Throwable
+ * Gets the callbacks for specified callback handler. The handler is required because an id is
+ * generated for each handler. So if have two callback handlers registered with the same server,
+ * no other way to know for which handler to get the callbacks for.
*/
public List getCallbacks(InvokerCallbackHandler callbackHandler) throws Throwable
{
if (callbackHandler != null)
{
String listenerId = (String)listeners.get(callbackHandler);
+
if(listenerId != null)
{
Map metadata = new HashMap();
metadata.put(LISTENER_ID_KEY, listenerId);
- return (List) invoke(new InternalInvocation(InternalInvocation.GETCALLBACKS, null), metadata);
+ return
+ (List)invoke(new InternalInvocation(InternalInvocation.GETCALLBACKS, null), metadata);
}
else
{
- log.error("Could not find listener id for InvokerCallbackHandler (" + callbackHandler + "), please verify handler has been registered as listener.");
+ log.error("Could not find listener id for InvokerCallbackHandler (" + callbackHandler +
+ "), please verify handler has been registered as listener.");
+
return null;
}
}
@@ -1294,8 +1104,8 @@
return acknowledgeCallback(callbackHandler, callback, null);
}
- public int acknowledgeCallback(InvokerCallbackHandler callbackHandler, Callback callback, Object response)
- throws Throwable
+ public int acknowledgeCallback(InvokerCallbackHandler callbackHandler, Callback callback,
+ Object response) throws Throwable
{
ArrayList callbacks = new ArrayList(1);
callbacks.add(callback);
@@ -1316,20 +1126,29 @@
return acknowledgeCallbacks(callbackHandler, callbacks, null);
}
- public int acknowledgeCallbacks(InvokerCallbackHandler callbackHandler, List callbacks, List responses)
- throws Throwable
+ public int acknowledgeCallbacks(InvokerCallbackHandler callbackHandler, List callbacks,
+ List responses) throws Throwable
{
if (callbackHandler == null)
+ {
throw new Exception("InvokerCallbackHandler parameter must not be null");
+ }
if (callbacks == null)
+ {
throw new Exception("Callback List parameter must not be null");
+ }
if (responses != null && responses.size() != callbacks.size())
- throw new Exception("Callback response list must be (1) null or (2) the same size as callback list");
+ {
+ throw new Exception("Callback response list must be (1) null " +
+ "or (2) the same size as callback list");
+ }
if (callbacks.size() == 0)
+ {
return 0;
+ }
if (isConnected())
{
@@ -1337,6 +1156,7 @@
Iterator idsIterator = callbacks.iterator();
ArrayList responseList = null;
Iterator responseIterator = null;
+
if (responses != null)
{
responseList = new ArrayList(responses.size());
@@ -1350,25 +1170,34 @@
for (int i = 0; i < callbacks.size(); i++)
{
callback = (Callback) idsIterator.next();
+
if (responseIterator != null)
+ {
response = responseIterator.next();
+ }
Map returnPayload = callback.getReturnPayload();
+
if (returnPayload != null)
{
Object callbackId = returnPayload.get(ServerInvokerCallbackHandler.CALLBACK_ID);
if (callbackId != null)
{
callbackIds.add(callbackId);
+
if (responseIterator != null)
+ {
responseList.add(response);
+ }
String nextListenerId = (String) returnPayload.get(LISTENER_ID_KEY);
+
if (nextListenerId == null)
{
throw new Exception("Cannot acknowledge callbacks: " +
"callback " + callbackId + " has null listener id");
}
+
if (i == 0)
{
listenerId = nextListenerId;
@@ -1382,7 +1211,8 @@
}
else
{
- log.error("Cannot acknowledge callback: callback id is missing from return payload");
+ log.error("Cannot acknowledge callback: callback id " +
+ "is missing from return payload");
}
}
else
@@ -1392,7 +1222,9 @@
}
if (callbackIds.size() == 0)
+ {
return 0;
+ }
Map metadata = new HashMap();
if(listenerId != null)
@@ -1401,12 +1233,14 @@
}
else
{
- throw new Exception("Could not find listener id for InvokerCallbackHandler (" + callbackHandler
- + "), please verify handler has been registered as listener.");
+ throw new Exception("Could not find listener id for InvokerCallbackHandler (" +
+ callbackHandler + "), please verify handler " +
+ "has been registered as listener.");
}
Object[] params = new Object[] {callbackIds, responseList};
- InternalInvocation invocation = new InternalInvocation(InternalInvocation.ACKNOWLEDGECALLBACK, params);
+ InternalInvocation invocation =
+ new InternalInvocation(InternalInvocation.ACKNOWLEDGECALLBACK, params);
invoke(invocation, metadata);
return callbackIds.size();
}
@@ -1417,11 +1251,8 @@
}
/**
- * Sets the marshaller implementation that should be used by the
- * client invoker (transport). This overrides the client's default
- * marshaller (or any set within configuration).
- *
- * @param marshaller
+ * Sets the marshaller implementation that should be used by the client invoker (transport). This
+ * overrides the client's default marshaller (or any set within configuration).
*/
public void setMarshaller(Marshaller marshaller)
{
@@ -1440,15 +1271,11 @@
{
throw new RuntimeException("Can not set remoting client Marshaller when not connected.");
}
-
}
/**
- * Sets the unmarshaller implementation that should be used
- * by the client invoker (transport). This overrides the client's default
- * unmarshaller (or any set within configuration).
- *
- * @param unmarshaller
+ * Sets the unmarshaller implementation that should be used by the client invoker (transport).
+ * This overrides the client's default unmarshaller (or any set within configuration).
*/
public void setUnMarshaller(UnMarshaller unmarshaller)
{
@@ -1471,14 +1298,13 @@
}
/**
- * Takes an inputstream and wraps a server around. Then calls the target
- * remoting server and passes a proxy for an inputstream to the server's handler.
- * When the server handler calls on this proxy, it will call back on this server
- * wrapped around this inputstream.
+ * Takes an inputstream and wraps a server around. Then calls the target remoting server and
+ * passes a proxy for an inputstream to the server's handler. When the server handler calls on
+ * this proxy, it will call back on this server wrapped around this inputstream.
*
- * @param inputStream
- * @param param invocation payload
- * @return the return value from the invocation
+ * @param param - invocation payload.
+ *
+ * @return the return value from the invocation.
* @throws Throwable
*/
public Object invoke(InputStream inputStream, Object param) throws Throwable
@@ -1487,106 +1313,262 @@
String locator = streamServer.getInvokerLocator();
// now call on target server and pass locator for stream callbacks
- InvocationRequest invocationRequest = new InvocationRequest(sessionId, subsystem, param, null, null, null);
- return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK, new Object[]{locator, invocationRequest}), null);
+ InvocationRequest invocationRequest =
+ new InvocationRequest(sessionId, subsystem, param, null, null, null);
+ return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK,
+ new Object[]{locator, invocationRequest}), null);
}
/**
- * Takes an inputstream and wraps a server around. Then calls the target
- * remoting server and passes a proxy for an inputstream to the server's handler.
- * When the server handler calls on this proxy, it will call back on this server
- * wrapped around this inputstream. The Connector passed is expected to have already been started and
- * will have the stream handler added with subsystem of 'stream'. Also note that the Connector passed
- * will not be stopped when/if the server calls to close the input stream.
+ * Takes an inputstream and wraps a server around. Then calls the target remoting server and
+ * passes a proxy for an inputstream to the server's handler. When the server handler calls on
+ * this proxy, it will call back on this server wrapped around this inputstream. The Connector
+ * passed is expected to have already been started and will have the stream handler added with
+ * subsystem of 'stream'. Also note that the Connector passed will not be stopped when/if the
+ * server calls to close the input stream.
+ *
+ * @param param - invocation payload.
*
- * @param inputStream
- * @param param invocation payload
* @return the return value from the invocation
- * @throws Throwable
*/
- public Object invoke(InputStream inputStream, Object param, Connector streamConnector) throws Throwable
+ public Object invoke(InputStream inputStream, Object param, Connector streamConnector)
+ throws Throwable
{
StreamServer streamServer = new StreamServer(inputStream, streamConnector);
String locator = streamServer.getInvokerLocator();
// now call on target server and pass locator for stream callbacks
- InvocationRequest invocationRequest = new InvocationRequest(sessionId, subsystem, param, null, null, null);
- return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK, new Object[]{locator, invocationRequest}), null);
+ InvocationRequest invocationRequest =
+ new InvocationRequest(sessionId, subsystem, param, null, null, null);
+
+ return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK,
+ new Object[]{locator, invocationRequest}), null);
}
/**
- * Takes an inputstream and wraps a server around. Then calls the target remoting server and passes proxy for
- * an inputstream to the server's handler. When the server handle calls on this proxy, it will call back on this server
- * wrapped around this inputstream. The InvokerLocator passed is used to create the internal Connector used to receive the
- * calls from the server side.
- * @param inputStream
- * @param param
- * @param streamServerLocator
- * @return
- * @throws Throwable
+ * Takes an inputstream and wraps a server around. Then calls the target remoting server and
+ * passes proxy for an inputstream to the server's handler. When the server handle calls on this
+ * proxy, it will call back on this server wrapped around this inputstream. The InvokerLocator
+ * passed is used to create the internal Connector used to receive the calls from the server
+ * side.
*/
- public Object invoke(InputStream inputStream, Object param, InvokerLocator streamServerLocator) throws Throwable
+ public Object invoke(InputStream inputStream, Object param, InvokerLocator streamServerLocator)
+ throws Throwable
{
StreamServer streamServer = new StreamServer(inputStream, streamServerLocator);
String locator = streamServer.getInvokerLocator();
// now call on target server and pass locator for stream callbacks
- InvocationRequest invocationRequest = new InvocationRequest(sessionId, subsystem, param, null, null, null);
- return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK, new Object[]{locator, invocationRequest}), null);
+ InvocationRequest invocationRequest =
+ new InvocationRequest(sessionId, subsystem, param, null, null, null);
+ return invoke(new InternalInvocation(InternalInvocation.ADDSTREAMCALLBACK,
+ new Object[]{locator, invocationRequest}), null);
}
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ // Private --------------------------------------------------------------------------------------
+
+ private void connect(ClientInvoker invoker)
{
- int version = in.readInt();
+ if (invoker != null)
+ {
+ invoker.connect();
+ try
+ {
+ setupClientLease(invoker);
+ }
+ catch (Throwable throwable)
+ {
+ RuntimeException e =
+ new RuntimeException("Error setting up client lease upon performing connect.");
+ e.initCause(throwable);
+ throw e;
+ }
+ }
+ else
+ {
+ throw new RuntimeException("Client invoker is null (may have used void constructor " +
+ "for Client, which should only be used for Externalization.");
+ }
+ }
- switch (version)
+ private void setupClientLease(ClientInvoker invoker) throws Throwable
{
- case Version.VERSION_2:
+
+ // start with checking the locator url for hint as to if should do initial lease ping
+ if (invoker != null)
{
- InvokerLocator readLocator = (InvokerLocator) in.readObject();
- this.subsystem = (String) in.readObject();
- this.configuration = (Map) in.readObject();
- boolean wasConnected = in.readBoolean();
+ if (invoker instanceof LocalClientInvoker)
+ {
+ // no need to continue as won't do client lease when is local
+ // JBREM-382
+ return;
+ }
- this.classloader = Thread.currentThread().getContextClassLoader();
+ InvokerLocator locator = invoker.getLocator();
+ Map locatorParams = locator.getParameters();
+ if (locatorParams != null)
+ {
+ String leaseValue = (String) locatorParams.get(InvokerLocator.CLIENT_LEASE);
+ if (leaseValue != null && leaseValue.length() > 0)
+ {
+ enableLease = Boolean.valueOf(leaseValue).booleanValue();
+ }
+ String leasePeriodValue = (String) locatorParams.get(InvokerLocator.CLIENT_LEASE_PERIOD);
+ if (leasePeriodValue != null && leasePeriodValue.length() > 0)
+ {
try
{
- this.invoker = InvokerRegistry.createClientInvoker(readLocator, configuration);
- if(wasConnected)
+ leasePeriod = Long.parseLong(leasePeriodValue);
+ }
+ catch (NumberFormatException e)
{
- connect();
+ log.warn("Could not convert client lease period value (" +
+ leasePeriodValue + ") to a number.");
}
}
- catch (Exception e)
+ }
+ }
+ else
{
- log.error(e);
- throw new IOException(e.getMessage());
+ throw new RuntimeException("Can not set up client lease as client invoker is null.");
}
- break;
+ if (configuration != null)
+ {
+ Object val = configuration.get(ENABLE_LEASE);
+
+ if (val != null)
+ {
+ if (val instanceof Boolean)
+ {
+ enableLease = ((Boolean) val).booleanValue();
+ }
+ else if (val instanceof String)
+ {
+ enableLease = Boolean.valueOf((String) val).booleanValue();
+ }
+ else
+ {
+ log.warn("Can not evaluate " + ENABLE_LEASE + " value (" +
+ val + ") as a boolean type.");
+ }
+ }
+ String leasePeriodValue = (String) configuration.get(InvokerLocator.CLIENT_LEASE_PERIOD);
+ if (leasePeriodValue != null && leasePeriodValue.length() > 0)
+ {
+ try
+ {
+ leasePeriod = Long.parseLong(leasePeriodValue);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Could not convert client lease period value (" +
+ leasePeriodValue + ") to a number.");
}
- default:
- throw new StreamCorruptedException("Unkown version seen: " + version);
}
}
- public void writeExternal(ObjectOutput out) throws IOException
+ if (enableLease)
{
- out.writeInt(Version.getDefaultVersion());
- out.writeObject(invoker != null ? invoker.getLocator() : locator);
- out.writeObject(subsystem);
- out.writeObject(configuration);
- out.writeBoolean(isConnected());
- out.flush();
+ invoker.establishLease(sessionId, configuration, leasePeriod);
+ }
}
- /**
- * PLEASE DO NOT USE THIS CONSTRUCTOR OR YOUR COMPUTER WILL BURST INTO FLAMES!!!
- * It is only here so can externalize object and will provide
- * a dead object if invoker is not explicitly set. Please use
- * other contructors provided.
- */
- public Client()
+ private Object invoke(Object param, Map metadata, InvokerLocator callbackServerLocator)
+ throws Throwable
+ {
+ if (isConnected())
+ {
+ return invoker.invoke(new InvocationRequest(sessionId, subsystem, param,
+ metadata, null, callbackServerLocator));
+ }
+ else
+ {
+ throw new Exception("Can not make remoting client invocation " +
+ "due to not being connected to server.");
+ }
+ }
+
+ private void addCallbackListener(InvokerCallbackHandler callbackhandler, Map metadata,
+ InvokerLocator callbackLocator, Object callbackHandlerObject)
+ throws Throwable
+ {
+ // if callback locator is null, then is pull callbacks and need to track callback handler
+ // per Client (not by client invoker).
+ if (callbackLocator == null)
+ {
+ String listenerId = generateListenerId(callbackhandler);
+
+ // if listenerId is null, means this Client has already had the callbackhanler reference
+ // registered as a listener, so no need to add it again.
+ if (listenerId != null)
{
+ Map internalMetadata = new HashMap();
+ internalMetadata.put(LISTENER_ID_KEY, listenerId);
+ if(metadata != null)
+ {
+ internalMetadata.putAll(metadata);
}
+ // now call server to add listener
+ invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null),
+ internalMetadata, callbackLocator);
+ }
+ }
+ else
+ {
+ // is going to be push callbacks which means callback server locator involved.
+ // will have to delegate to client invoker.
+ String listenerId = invoker.addClientLocator(sessionId, callbackhandler, callbackLocator);
+
+ if (listenerId != null)
+ {
+
+ Map internalMetadata = new HashMap();
+ internalMetadata.put(LISTENER_ID_KEY, listenerId);
+ if(metadata != null)
+ {
+ internalMetadata.putAll(metadata);
+ }
+
+ Client client = new Client(callbackLocator, subsystem);
+ client.setSessionId(getSessionId());
+ client.connect();
+
+ try
+ {
+ InternalInvocation i =
+ new InternalInvocation(InternalInvocation.ADDCLIENTLISTENER,
+ new Object[]{callbackhandler, callbackHandlerObject});
+
+ client.invoke(i, internalMetadata);
+ }
+ finally
+ {
+ client.disconnect();
+ }
+
+ // now call server to add listener
+ invoke(new InternalInvocation(InternalInvocation.ADDLISTENER, null),
+ internalMetadata, callbackLocator);
+ }
+ }
+ }
+
+ private String generateListenerId(InvokerCallbackHandler callbackhandler)
+ {
+ String listenerId = null;
+ Object obj = listeners.get(callbackhandler);
+ if(obj == null)
+ {
+ listenerId = new GUID().toString();
+ listeners.put(callbackhandler, listenerId);
+ }
+ return listenerId;
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list