[jboss-remoting-commits] JBoss Remoting SVN: r5214 - remoting2/branches/2.x/src/main/org/jboss/remoting.
jboss-remoting-commits at lists.jboss.org
jboss-remoting-commits at lists.jboss.org
Thu May 14 21:34:55 EDT 2009
Author: ron.sigal at jboss.com
Date: 2009-05-14 21:34:54 -0400 (Thu, 14 May 2009)
New Revision: 5214
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java
Log:
JBREM-1132: (1) Added new connect() and notifyListeners(); (2) Clients can share ConnectionValidators; (3) dialed down logging.
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java 2009-05-13 08:01:07 UTC (rev 5213)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java 2009-05-15 01:34:54 UTC (rev 5214)
@@ -167,13 +167,20 @@
public static final String INVOKER_DESTRUCTION_DELAY = "invokerDestructionDelay";
public static final String THROW_CALLBACK_EXCEPTION = "throwCallbackException";
+
+ private static Map connectionValidators = new HashMap();
+ private static Object connectionValidatorLock = new Object();
+ static final String CLIENT = "client";
+ static final String CONNECTION_LISTENER = "connectionListener";
+
/** The key to use to specify that parameters for objects created by Client should be taken,
* in addition to the metadata map, from the InvokerLocator and from the Client's configuration map.
*/
public static final String USE_ALL_PARAMS = "useAllParams";
private static final Logger log = Logger.getLogger(Client.class);
+ private static boolean trace = log.isTraceEnabled();
private static final long serialVersionUID = 5679279425009837934L;
@@ -198,6 +205,7 @@
private InvokerLocator locator;
private ConnectionValidator connectionValidator = null;
+ private ConnectionValidatorKey connectionValidatorKey;
private Map configuration = new HashMap();
private Map callbackConnectors = new HashMap();
@@ -213,6 +221,10 @@
private int invokerDestructionDelay = 0;
+ private Set connectionListeners = new HashSet();
+
+ private boolean useClientConnectionIdentity;
+
// Constructors ---------------------------------------------------------------------------------
/**
@@ -420,7 +432,7 @@
if (invoker == null)
{
throw new RuntimeException("Can not add connection listener to remoting client " +
- "until client has been connected.");
+ "while client is not connected.");
}
else
{
@@ -431,11 +443,57 @@
}
}
- if (connectionValidator == null)
+ synchronized (connectionValidatorLock)
{
- connectionValidator = new ConnectionValidator(this, metadata);
+ if (trace) log.trace(this + " in addConnectionListener()");
+ if (connectionValidator == null)
+ {
+ Map map = new HashMap(configuration);
+ map.putAll(metadata);
+ connectionValidatorKey = new ConnectionValidatorKey(invoker, map);
+ WeakReference ref = (WeakReference) connectionValidators.get(connectionValidatorKey);
+ if (ref == null)
+ {
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ if (trace) log.trace(this + ": created " + connectionValidator);
+ }
+ else
+ {
+ connectionValidator = (ConnectionValidator) ref.get();
+ if (connectionValidator.addConnectionListener(this, listener))
+ {
+ if (trace) log.trace(this + ": reusing from static table: " + connectionValidator);
+ }
+ else
+ {
+ if (trace) log.trace(this + ": unable to reuse existing ConnectionValidator in static map: " + connectionValidator);
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ if (trace) log.trace(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
+ }
+ }
+ }
+ else
+ {
+ if (connectionValidator.addConnectionListener(this, listener))
+ {
+ if (trace) log.trace(this + ": reusing from local reference: " + connectionValidator);
+ }
+ else
+ {
+ if (trace) log.trace(this + ": unable to reuse ConnectionValidator from local reference: " + connectionValidator);
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ if (trace) log.trace(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
+ }
+ }
+
+ connectionListeners.add(listener);
}
- connectionValidator.addConnectionListener(listener);
}
/**
@@ -444,11 +502,36 @@
*/
public boolean removeConnectionListener(ConnectionListener listener)
{
- if (connectionValidator == null)
+ if (trace) log.trace(this + ".removeConnectionListener(" + listener + ")");
+ boolean isRemoved = false;
+ synchronized (connectionValidatorLock)
{
- return false;
+ if (connectionValidator == null)
+ {
+ return false;
+ }
+ isRemoved = connectionValidator.removeConnectionListener(this, listener);
+ if (connectionValidator.isStopped())
+ {
+ if (connectionValidators.remove(connectionValidatorKey) != null)
+ {
+ log.debug(this + ".removeConnectionListener() removed from static map: " + connectionValidator);
+ }
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+ connectionListeners.remove(listener);
+ if (connectionListeners.isEmpty())
+ {
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+ if (connectionValidator == null)
+ {
+ if (trace) log.trace(this + " set connectionValidator to null");
+ }
}
- return connectionValidator.removeConnectionListener(listener);
+ return isRemoved;
}
/**
@@ -494,6 +577,41 @@
*/
public void connect() throws Exception
{
+ connect(null, null);
+ }
+
+ /**
+ * Will cause the underlying transport to make connection to the target server. This is
+ * important for any stateful transports, like socket or multiplex. This is also when a client
+ * lease with the server is started. If listener is not null, it will be registered to
+ * receive a callback if the connection fails.
+ */
+ public void connect(ConnectionListener listener) throws Exception
+ {
+ connect(listener, null);
+ }
+
+ /**
+ * Will cause the underlying transport to make connection to the target server. This is
+ * important for any stateful transports, like socket or multiplex. This is also when a client
+ * lease with the server is started. If listener is not null, it will be registered to
+ * receive a callback if the connection fails.
+ * <p>
+ *
+ * If this version of connect() is used, and leasing is enabled, the concept of "connection
+ * identity" is enforced. That is, the ConnectionValidator used by this Client will be
+ * tied to the LeasePinger currently used by the MicroRemoteClientInvoker created or reused
+ * in this method, and that LeasePinger will be tied to this Client and its ConnectionValidator.
+ * If the ConnectionValidator used by any of the Clients associated with the MicroRemoteClientInvoker
+ * used by this Client detects a broken connection, it will shut down that LeasePinger.
+ * Moreover, each ConnectionValidator associated with that LeasePinger will notify its
+ * ConnectionListeners of the broken connection. At that point, the LeasePinger will be
+ * destroyed, and all of the associated Clients will be disconnected.
+ */
+ public void connect(ConnectionListener listener, Map metadata) throws Exception
+ {
+ log.debug(this + ".connect(" + listener + ")");
+ if (trace) log.trace(this + ": metadata = " + metadata);
if (isConnected())
return;
@@ -512,9 +630,10 @@
invoker = InvokerRegistry.createClientInvoker(locator, configuration);
}
- connect(invoker);
+ connect(invoker, listener, metadata);
connected = true;
+ log.debug(this + " is connected");
}
/**
@@ -525,17 +644,15 @@
*/
public void disconnect()
{
+ if (trace) log.trace(this + " entering disconnect()");
+
+ connected = false;
+
if (invoker != null)
{
// this is a noop if no lease is active
invoker.terminateLease(sessionId, disconnectTimeout);
-
- if (connectionValidator != null)
- {
- connectionValidator.stop();
- connectionValidator = null;
- }
-
+
// Need to remove myself from registry so will not keep reference to me since I am of no
// use now. Will have to create a new one.
@@ -560,7 +677,7 @@
invokerDestructionTimer.schedule(task, invokerDestructionDelay);
}
- log.trace(this + " scheduled destruction of " + invoker);
+ if (trace) log.trace(this + " scheduled destruction of " + invoker);
}
}
else
@@ -570,8 +687,30 @@
invoker = null;
}
-
- connected = false;
+
+ synchronized (connectionValidatorLock)
+ {
+ if (connectionValidator != null)
+ {
+ Iterator it = connectionListeners.iterator();
+ while (it.hasNext())
+ {
+ ConnectionListener listener = (ConnectionListener) it.next();
+ connectionValidator.removeConnectionListener(this, listener);
+ }
+ if (connectionValidator.isStopped())
+ {
+ if (connectionValidators.remove(connectionValidatorKey) != null)
+ {
+ if (trace) log.trace(this + ".disconnect() removed from static map: " + connectionValidator);
+ }
+ }
+
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+ }
+ log.debug(this + " is disconnected");
}
/**
@@ -1252,7 +1391,7 @@
{
if (e.getCause() != null && e.getCause() instanceof SocketTimeoutException)
{
- if (log.isTraceEnabled()) log.trace(this + ": getCallbacks() timed out: returning empty list");
+ if (trace) log.trace(this + ": getCallbacks() timed out: returning empty list");
return new ArrayList();
}
throw e;
@@ -1596,23 +1735,61 @@
public String toString()
{
- return "Client[" + System.identityHashCode(this) + "]";
+ return "Client[" + System.identityHashCode(this) + ":" + sessionId + "]";
}
// Package protected ----------------------------------------------------------------------------
+ void notifyListeners()
+ {
+ synchronized (connectionValidatorLock)
+ {
+ log.debug(this + " entering notifyListeners(): " + connectionValidator);
+ if (connectionValidator != null)
+ {
+ synchronized (connectionValidator)
+ {
+ if (connectionValidator.isStopped())
+ {
+ if (trace) log.trace(this + ": " + connectionValidator + " is stopped");
+ }
+ else
+ {
+ if (trace) log.trace(this + ": " + connectionValidator + " is not stopped");
+ if (trace) log.trace(this + " calling connectionValidator.notifyListeners()");
+ connectionValidator.notifyListeners(new Exception("Could not connect to server!"));
+ Iterator it = connectionListeners.iterator();
+ while (it.hasNext())
+ {
+ ConnectionListener listener = (ConnectionListener) it.next();
+ connectionValidator.removeConnectionListener(this, listener);
+ }
+ if (connectionValidators.remove(connectionValidatorKey) != null)
+ {
+ if (trace) log.trace(this + ".notifyAndDisconnect() removed from static map: " + connectionValidator);
+ }
+ }
+ }
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+
+ log.debug(this + " leaving notifyListeners()");
+ }
+ }
+
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
- private void connect(ClientInvoker invoker)
+ private void connect(ClientInvoker invoker, ConnectionListener listener, Map metadata)
{
if (invoker != null)
{
invoker.connect();
try
{
- setupClientLease(invoker);
+ setupClientLease(invoker, listener, metadata);
}
catch (Throwable throwable)
{
@@ -1621,6 +1798,7 @@
e.initCause(throwable);
throw e;
}
+ log.debug(this + " connected to " + locator);
}
else
{
@@ -1629,7 +1807,7 @@
}
}
- private void setupClientLease(ClientInvoker invoker) throws Throwable
+ private void setupClientLease(ClientInvoker invoker, ConnectionListener listener, Map metadata) throws Throwable
{
long leasePeriod = -1;
boolean enableLease = false;
@@ -1710,10 +1888,26 @@
}
}
+ if (trace) log.trace(this + " enableLease: " + enableLease);
if (enableLease)
{
- invoker.establishLease(sessionId, configuration, leasePeriod);
+ Map temp = new HashMap(configuration);
+ if (metadata != null)
+ {
+ temp.putAll(metadata);
+ }
+ if (useClientConnectionIdentity)
+ {
+ temp.put(CLIENT, this);
+ temp.put(CONNECTION_LISTENER, listener);
+ }
+ if (trace) log.trace(this + " calling MicroRemoteClientInvoker.establishLease()");
+ invoker.establishLease(sessionId, temp, leasePeriod);
}
+ else if (listener != null)
+ {
+ addConnectionListener(listener, metadata);
+ }
}
private Object invoke(Object param, Map metadata, InvokerLocator callbackServerLocator)
@@ -1833,6 +2027,28 @@
{
log.error("invokerDestructionDelay parameter must be a string in integer format: " + param);
}
+
+ param = configuration.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+ if (param instanceof String)
+ {
+ useClientConnectionIdentity = Boolean.valueOf((String) param).booleanValue();
+ }
+ else if (param != null)
+ {
+ log.warn("value of " + Remoting.USE_CLIENT_CONNECTION_IDENTITY + " must be a String: " + param);
+ }
+ else
+ {
+ if (locator.getParameters() != null)
+ {
+ param = locator.getParameters().get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+ if (param != null)
+ {
+ useClientConnectionIdentity = Boolean.valueOf((String) param).booleanValue();
+ this.configuration.put(Remoting.USE_CLIENT_CONNECTION_IDENTITY, param);
+ }
+ }
+ }
}
private void configureCallbackServerSocketFactory(Map map) throws Exception
@@ -1864,6 +2080,34 @@
}
}
+ static class ConnectionValidatorKey
+ {
+ private ClientInvoker invoker;
+ private Map metadata;
+
+ ConnectionValidatorKey(ClientInvoker invoker, Map metadata)
+ {
+ this.invoker = invoker;
+ this.metadata = metadata;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (o == null)
+ return false;
+ if (! (o instanceof ConnectionValidatorKey))
+ return false;
+ ConnectionValidatorKey holder = (ConnectionValidatorKey) o;
+ boolean metadataEquals = (metadata == null && holder.metadata == null) || metadata.equals(holder.metadata);
+ return invoker == holder.invoker && metadataEquals;
+ }
+
+ public int hashCode()
+ {
+ return invoker.hashCode() * metadata.hashCode();
+ }
+ }
+
static private InetAddress getLocalHost() throws UnknownHostException
{
if (SecurityUtility.skipAccessControl())
More information about the jboss-remoting-commits
mailing list