Author: ron.sigal(a)jboss.com
Date: 2009-05-10 01:15:12 -0400 (Sun, 10 May 2009)
New Revision: 5166
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java
Log:
JBREM-1128: Various changes to implement connection identity.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java 2009-05-08 18:12:22 UTC
(rev 5165)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/Client.java 2009-05-10 05:15:12 UTC
(rev 5166)
@@ -49,6 +49,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.StreamCorruptedException;
+import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.SocketTimeoutException;
import java.rmi.MarshalException;
@@ -158,7 +159,14 @@
*/
public static final String USE_ALL_PARAMS = "useAllParams";
+ private static Map connectionValidators = new HashMap();
+ private static Object connectionValidatorLock = new Object();
+
+ static final String CLIENT = "client";
+ static final String CONNECTION_LISTENER = "connectionListener";
+
private static final Logger log = Logger.getLogger(Client.class);
+ private static boolean trace = log.isTraceEnabled();
private static final long serialVersionUID = 5679279425009837934L;
@@ -180,6 +188,7 @@
private InvokerLocator locator;
private ConnectionValidator connectionValidator = null;
+ private ConnectionValidatorKey connectionValidatorKey;
private Map configuration = new HashMap();
private Map callbackConnectors = new HashMap();
@@ -192,6 +201,10 @@
private int disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT;
private boolean connected = false;
+
+ private Set connectionListeners = new HashSet();
+
+ private boolean useClientConnectionIdentity;
// Constructors
---------------------------------------------------------------------------------
@@ -269,6 +282,27 @@
if (configuration != null)
{
this.configuration = new HashMap(configuration);
+ Object o = configuration.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+ if (o instanceof String)
+ {
+ useClientConnectionIdentity = Boolean.valueOf((String) o).booleanValue();
+ }
+ else if (o != null)
+ {
+ log.warn("value of " + Remoting.USE_CLIENT_CONNECTION_IDENTITY +
" must be a String: " + o);
+ }
+ else
+ {
+ if (locator.getParameters() != null)
+ {
+ o = locator.getParameters().get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+ if (o != null)
+ {
+ useClientConnectionIdentity = Boolean.valueOf((String)
o).booleanValue();
+ this.configuration.put(Remoting.USE_CLIENT_CONNECTION_IDENTITY, o);
+ }
+ }
+ }
}
this.sessionId = new GUID().toString();
}
@@ -379,7 +413,7 @@
if (invoker == null)
{
throw new RuntimeException("Can not add connection listener to remoting
client " +
- "until client has been connected.");
+ "while client is not connected.");
}
else
{
@@ -390,11 +424,56 @@
}
}
- if (connectionValidator == null)
+ synchronized (connectionValidatorLock)
{
- connectionValidator = new ConnectionValidator(this, metadata);
+ if (connectionValidator == null)
+ {
+ Map map = new HashMap(configuration);
+ map.putAll(metadata);
+ connectionValidatorKey = new ConnectionValidatorKey(invoker, map);
+ WeakReference ref = (WeakReference)
connectionValidators.get(connectionValidatorKey);
+ if (ref == null)
+ {
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new
WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ log.debug(this + ": created " + connectionValidator);
+ }
+ else
+ {
+ connectionValidator = (ConnectionValidator) ref.get();
+ if (connectionValidator.addConnectionListener(this, listener))
+ {
+ if (trace) log.trace(this + ": reusing from static table: "
+ connectionValidator);
+ }
+ else
+ {
+ if (trace) log.trace(this + ": unable to reuse existing
ConnectionValidator in static map: " + connectionValidator);
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new
WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ log.debug(this + ": current ConnectionValidator is stopped:
created " + connectionValidator);
+ }
+ }
+ }
+ else
+ {
+ if (connectionValidator.addConnectionListener(this, listener))
+ {
+ if (trace) log.trace(this + ": reusing from local reference: " +
connectionValidator);
+ }
+ else
+ {
+ if (trace) log.trace(this + ": unable to reuse ConnectionValidator
from local reference: " + connectionValidator);
+ connectionValidator = new ConnectionValidator(this, metadata);
+ connectionValidators.put(connectionValidatorKey, new
WeakReference(connectionValidator));
+ connectionValidator.addConnectionListener(this, listener);
+ if (trace) log.trace(this + ": current ConnectionValidator is
stopped: created " + connectionValidator);
+ }
+ }
+
+ connectionListeners.add(listener);
}
- connectionValidator.addConnectionListener(listener);
}
/**
@@ -403,11 +482,36 @@
*/
public boolean removeConnectionListener(ConnectionListener listener)
{
- if (connectionValidator == null)
+ if (trace) log.trace(this + ".removeConnectionListener(" + listener +
")");
+ boolean isRemoved = false;
+ synchronized (connectionValidatorLock)
{
- return false;
+ if (connectionValidator == null)
+ {
+ return false;
+ }
+ isRemoved = connectionValidator.removeConnectionListener(this, listener);
+ if (connectionValidator.isStopped())
+ {
+ if (connectionValidators.remove(connectionValidatorKey) != null)
+ {
+ log.debug(this + ".removeConnectionListener() removed from static
map: " + connectionValidator);
+ }
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+ connectionListeners.remove(listener);
+ if (connectionListeners.isEmpty())
+ {
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+ if (connectionValidator == null)
+ {
+ if (trace) log.trace(this + " set connectionValidator to null");
+ }
}
- return connectionValidator.removeConnectionListener(listener);
+ return isRemoved;
}
/**
@@ -453,6 +557,41 @@
*/
public void connect() throws Exception
{
+ connect(null, null);
+ }
+
+ /**
+ * Will cause the underlying transport to make connection to the target server. This
is
+ * important for any stateful transports, like socket or multiplex. This is also when
a client
+ * lease with the server is started. If listener is not null, it will be registered
to
+ * receive a callback if the connection fails.
+ */
+ public void connect(ConnectionListener listener) throws Exception
+ {
+ connect(listener, null);
+ }
+
+ /**
+ * Will cause the underlying transport to make connection to the target server. This
is
+ * important for any stateful transports, like socket or multiplex. This is also when
a client
+ * lease with the server is started. If listener is not null, it will be registered
to
+ * receive a callback if the connection fails.
+ * <p>
+ *
+ * If this version of connect() is used, and leasing is enabled, the concept of
"connection
+ * identity" is enforced. That is, the ConnectionValidator used by this Client
will be
+ * tied to the LeasePinger currently used by the MicroRemoteClientInvoker created or
reused
+ * in this method, and that LeasePinger will be tied to this Client and its
ConnectionValidator.
+ * If the ConnectionValidator used by any of the Clients associated with the
MicroRemoteClientInvoker
+ * used by this Client detects a broken connection, it will shut down that
LeasePinger.
+ * Moreover, each ConnectionValidator associated with that LeasePinger will notify
its
+ * ConnectionListeners of the broken connection. At that point, the LeasePinger will
be
+ * destroyed, and all of the associated Clients will be disconnected.
+ */
+ public void connect(ConnectionListener listener, Map metadata) throws Exception
+ {
+ log.debug(this + ".connect(" + listener + ")");
+
if (isConnected())
return;
@@ -471,7 +610,7 @@
invoker = InvokerRegistry.createClientInvoker(locator, configuration);
}
- connect(invoker);
+ connect(invoker, listener, metadata);
connected = true;
}
@@ -484,25 +623,44 @@
*/
public void disconnect()
{
+ if (trace) log.trace(this + " entering disconnect()");
+
+ connected = false;
+
if (invoker != null)
{
// this is a noop if no lease is active
invoker.terminateLease(sessionId, disconnectTimeout);
+
+ // Need to remove myself from registry so will not keep reference to me since I
am of no
+ // use now. Will have to create a new one.
+ InvokerRegistry.destroyClientInvoker(invoker.getLocator(), configuration);
+ invoker = null;
+ }
+ synchronized (connectionValidatorLock)
+ {
if (connectionValidator != null)
{
- connectionValidator.stop();
+ Iterator it = connectionListeners.iterator();
+ while (it.hasNext())
+ {
+ ConnectionListener listener = (ConnectionListener) it.next();
+ connectionValidator.removeConnectionListener(this, listener);
+ }
+ if (connectionValidator.isStopped())
+ {
+ if (connectionValidators.remove(connectionValidatorKey) != null)
+ {
+ if (trace) log.trace(this + ".disconnect() removed from static
map: " + connectionValidator);
+ }
+ }
+
connectionValidator = null;
+ connectionValidatorKey = null;
}
-
- // Need to remove myself from registry so will not keep reference to me since I
am of no
- // use now. Will have to create a new one.
-
- InvokerRegistry.destroyClientInvoker(invoker.getLocator(), configuration);
- invoker = null;
}
-
- connected = false;
+ log.debug(this + " is disconnected");
}
/**
@@ -1183,7 +1341,7 @@
{
if (e.getCause() != null && e.getCause() instanceof
SocketTimeoutException)
{
- if (log.isTraceEnabled()) log.trace(this + ": getCallbacks() timed
out: returning empty list");
+ if (trace) log.trace(this + ": getCallbacks() timed out: returning
empty list");
return new ArrayList();
}
throw e;
@@ -1517,23 +1675,61 @@
public String toString()
{
- return "Client[" + System.identityHashCode(this) + "]";
+ return "Client[" + System.identityHashCode(this) + ":" +
sessionId + "]";
}
// Package protected
----------------------------------------------------------------------------
+ void notifyListeners()
+ {
+ synchronized (connectionValidatorLock)
+ {
+ log.debug(this + " entering notifyListeners(): " +
connectionValidator);
+ if (connectionValidator != null)
+ {
+ synchronized (connectionValidator)
+ {
+ if (connectionValidator.isStopped())
+ {
+ if (trace) log.trace(this + ": " + connectionValidator +
" is stopped");
+ }
+ else
+ {
+ if (trace) log.trace(this + ": " + connectionValidator +
" is not stopped");
+ if (trace) log.trace(this + " calling
connectionValidator.notifyListeners()");
+ connectionValidator.notifyListeners(new Exception("Could not
connect to server!"));
+ Iterator it = connectionListeners.iterator();
+ while (it.hasNext())
+ {
+ ConnectionListener listener = (ConnectionListener) it.next();
+ connectionValidator.removeConnectionListener(this, listener);
+ }
+ if (connectionValidators.remove(connectionValidatorKey) != null)
+ {
+ if (trace) log.trace(this + ".notifyAndDisconnect() removed
from static map: " + connectionValidator);
+ }
+ }
+ }
+ connectionValidator = null;
+ connectionValidatorKey = null;
+ }
+
+ log.debug(this + " leaving notifyListeners()");
+ }
+ }
+
// Protected
------------------------------------------------------------------------------------
// Private
--------------------------------------------------------------------------------------
- private void connect(ClientInvoker invoker)
+ private void connect(ClientInvoker invoker, ConnectionListener listener, Map
metadata)
{
if (invoker != null)
{
invoker.connect();
try
{
- setupClientLease(invoker);
+ setupClientLease(invoker, listener, metadata);
}
catch (Throwable throwable)
{
@@ -1542,6 +1738,7 @@
e.initCause(throwable);
throw e;
}
+ log.debug(this + " connected to " + locator);
}
else
{
@@ -1550,7 +1747,7 @@
}
}
- private void setupClientLease(ClientInvoker invoker) throws Throwable
+ private void setupClientLease(ClientInvoker invoker, ConnectionListener listener, Map
metadata) throws Throwable
{
long leasePeriod = -1;
boolean enableLease = false;
@@ -1631,10 +1828,26 @@
}
}
+ if (trace) log.trace(this + " enableLease: " + enableLease);
if (enableLease)
{
- invoker.establishLease(sessionId, configuration, leasePeriod);
+ Map temp = new HashMap(configuration);
+ if (metadata != null)
+ {
+ temp.putAll(metadata);
+ }
+ if (useClientConnectionIdentity)
+ {
+ temp.put(CLIENT, this);
+ temp.put(CONNECTION_LISTENER, listener);
+ }
+ if (trace) log.trace(this + " calling
MicroRemoteClientInvoker.establishLease()");
+ invoker.establishLease(sessionId, temp, leasePeriod);
}
+ else if (listener != null)
+ {
+ addConnectionListener(listener, metadata);
+ }
}
private Object invoke(Object param, Map metadata, InvokerLocator
callbackServerLocator)
@@ -1731,4 +1944,32 @@
// Inner classes
--------------------------------------------------------------------------------
+
+ static class ConnectionValidatorKey
+ {
+ private ClientInvoker invoker;
+ private Map metadata;
+
+ ConnectionValidatorKey(ClientInvoker invoker, Map metadata)
+ {
+ this.invoker = invoker;
+ this.metadata = metadata;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (o == null)
+ return false;
+ if (! (o instanceof ConnectionValidatorKey))
+ return false;
+ ConnectionValidatorKey holder = (ConnectionValidatorKey) o;
+ boolean metadataEquals = (metadata == null && holder.metadata == null)
|| metadata.equals(holder.metadata);
+ return invoker == holder.invoker && metadataEquals;
+ }
+
+ public int hashCode()
+ {
+ return invoker.hashCode() * metadata.hashCode();
+ }
+ }
}