[jboss-remoting-commits] JBoss Remoting SVN: r5216 - remoting2/branches/2.x/src/main/org/jboss/remoting.
jboss-remoting-commits at lists.jboss.org
jboss-remoting-commits at lists.jboss.org
Thu May 14 21:58:55 EDT 2009
Author: ron.sigal at jboss.com
Date: 2009-05-14 21:58:55 -0400 (Thu, 14 May 2009)
New Revision: 5216
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java
Log:
JBREM-1132: Introduced "stopping" and "useClientConnectionIdentity".
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java 2009-05-15 01:40:05 UTC (rev 5215)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java 2009-05-15 01:58:55 UTC (rev 5216)
@@ -22,11 +22,11 @@
package org.jboss.remoting;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -117,7 +117,7 @@
}
catch (Throwable throwable)
{
- log.debug("ConnectionValidator to connect to server " +
+ log.debug("ConnectionValidator unable to connect to server " +
innerClientInvoker.getLocator().getProtocol() + "://" +
innerClientInvoker.getLocator().getHost() + ":" +
innerClientInvoker.getLocator().getPort(), throwable);
@@ -227,12 +227,13 @@
private Map metadata;
private InvokerLocator locator;
private Map configMap;
- private List listeners;
+ private Map listeners;
private ClientInvoker clientInvoker;
private Object lock = new Object();
private Object notificationLock = new Object();
private boolean started;
private volatile boolean stopped;
+ private volatile boolean stopping;
private String invokerSessionId;
private boolean tieToLease = true;
private boolean stopLeaseOnFailure = true;
@@ -240,6 +241,9 @@
private int failureDisconnectTimeout = -1;
private boolean isValid;
private Timer timer;
+ private MicroRemoteClientInvoker sharedInvoker;
+ private LeasePinger leasePinger;
+ private boolean useClientConnectionIdentity;
// Constructors ---------------------------------------------------------------------------------
@@ -251,9 +255,10 @@
public ConnectionValidator(Client client, long pingPeriod)
{
this.client = client;
+ this.locator = client.getInvoker().getLocator();
this.pingPeriod = pingPeriod;
pingTimeout = DEFAULT_PING_TIMEOUT_INT;
- listeners = new ArrayList();
+ listeners = new HashMap();
stopped = false;
getParameters(client, new HashMap());
log.debug(this + " created");
@@ -262,9 +267,10 @@
public ConnectionValidator(Client client, Map metadata)
{
this.client = client;
+ this.locator = client.getInvoker().getLocator();
pingPeriod = DEFAULT_PING_PERIOD;
pingTimeout = DEFAULT_PING_TIMEOUT_INT;
- listeners = new ArrayList();
+ listeners = new HashMap();
stopped = false;
this.metadata = new HashMap(metadata);
getParameters(client, metadata);
@@ -300,7 +306,7 @@
".addConnectionListener() instead.");
}
- if (stopped)
+ if (stopping)
{
return;
}
@@ -321,7 +327,7 @@
try
{
- if(!stopped)
+ if(!stopping)
{
isValid = false;
@@ -366,41 +372,84 @@
// Public ---------------------------------------------------------------------------------------
- public void addConnectionListener(ConnectionListener listener)
+ public boolean addConnectionListener(Client client, ConnectionListener listener)
{
+ boolean doStart = false;
if (listener != null)
{
synchronized (lock)
{
+ if (stopping)
+ {
+ if (trace) log.trace(this + " is stopped. Cannot add ConnectionListener: " + listener + " for " + client);
+ return false;
+ }
if (listeners.size() == 0)
{
- start();
+ doStart = true;
}
- listeners.add(listener);
+ Set s = (Set) listeners.get(listener);
+ if (s == null)
+ {
+ s = new HashSet();
+ }
+ s.add(client);
+ listeners.put(listener, s);
+ log.debug(this + " added ConnectionListener: " + listener + " for " + client);
}
+ if (doStart)
+ {
+ start();
+ }
}
+
+ return true;
}
- public boolean removeConnectionListener(ConnectionListener listener)
+ public boolean removeConnectionListener(Client client, ConnectionListener listener)
{
- boolean isRemoved = false;
- if (listener != null)
+ if (listener == null)
{
- synchronized (lock)
+ if (trace) log.trace(this + " ConnectionListener is null");
+ return false;
+ }
+ synchronized (lock)
+ {
+ if (stopping)
{
- isRemoved = listeners.remove(listener);
- if (listeners.size() == 0)
- {
- stop();
- }
+ if (trace) log.trace(this + " is stopped. It's too late to remove " + listener);
+ return false;
}
+ Set s = (Set) listeners.get(listener);
+ if (s == null)
+ {
+ log.debug(this + ": " + listener + " is not registered");
+ return false;
+ }
+ if (s.remove(client))
+ {
+ log.debug(this + " removed ConnectionListener: " + listener + " for " + client);
+ }
+ else
+ {
+ log.debug(this + ": " + listener + " is not registered for " + client);
+ return false;
+ }
+ if (s.size() == 0)
+ {
+ listeners.remove(listener);
+ }
+ if (listeners.size() == 0)
+ {
+ stop();
+ }
}
- return isRemoved;
+ return true;
}
public long getPingPeriod()
{
- if (stopped)
+ if (stopping)
{
return -1;
}
@@ -412,9 +461,52 @@
{
return "ConnectionValidator[" + Integer.toHexString(System.identityHashCode(this)) + ":" + clientInvoker + ", pingPeriod=" + pingPeriod + " ms]";
}
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
// Package protected ----------------------------------------------------------------------------
+ void notifyListeners(Throwable thr)
+ {
+ final Throwable t = thr;
+ synchronized (lock)
+ {
+ if (stopping)
+ {
+ return;
+ }
+ stopping = true;
+ if (trace) log.trace(this + " is stopped. No more listeners will be accepted.");
+
+ Iterator itr = listeners.keySet().iterator();
+ while (itr.hasNext())
+ {
+ final ConnectionListener listener = (ConnectionListener) itr.next();
+ Set clients = (Set) listeners.get(listener);
+ Iterator itr2 = clients.iterator();
+ while (itr2.hasNext())
+ {
+ final Client client = (Client) itr2.next();
+ new Thread()
+ {
+ public void run()
+ {
+ log.debug(ConnectionValidator.this + " calling " + listener + ".handleConnectionException() for " + client);
+ listener.handleConnectionException(t, client);
+ }
+ }.start();
+ }
+ }
+
+ listeners.clear();
+ }
+
+ stop();
+ }
+
// Protected ------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
@@ -431,12 +523,21 @@
ClientInvoker clientInvoker = client.getInvoker();
if (clientInvoker instanceof MicroRemoteClientInvoker)
{
- invokerSessionId = ((MicroRemoteClientInvoker) clientInvoker).getSessionId();
+ sharedInvoker = (MicroRemoteClientInvoker) clientInvoker;
+ invokerSessionId = sharedInvoker.getSessionId();
}
else
{
throw new RuntimeException("creating a ConnectionValidator on a local connection");
}
+ if (stopLeaseOnFailure)
+ {
+ if (sharedInvoker != null)
+ {
+ leasePinger = sharedInvoker.getLeasePinger();
+ }
+ }
+ if (trace) log.trace(this + ": sharedInvoker = " + sharedInvoker + ", leasePinger = " + leasePinger);
}
private boolean checkUseParametersFromLocator(Client client, Map metadata)
@@ -580,6 +681,7 @@
}
o = config.get(FAILURE_DISCONNECT_TIMEOUT);
+ if (trace) log.trace(this + " \"failureDisconnectTimeout\" set to " + o);
if (o != null)
{
if (o instanceof String)
@@ -587,6 +689,7 @@
try
{
failureDisconnectTimeout = Integer.valueOf(((String) o)).intValue();
+ if (trace) log.trace(this + " setting failureDisconnectTimeout to " + failureDisconnectTimeout);
}
catch (Exception e)
{
@@ -600,6 +703,27 @@
" to an int: must be a String");
}
}
+ o = config.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+ if (o != null)
+ {
+ if (o instanceof String)
+ {
+ try
+ {
+ useClientConnectionIdentity = Boolean.valueOf(((String) o)).booleanValue();
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + Remoting.USE_CLIENT_CONNECTION_IDENTITY + " value" +
+ " to a boolean: " + o);
+ }
+ }
+ else
+ {
+ log.warn(this + " could not convert " + Remoting.USE_CLIENT_CONNECTION_IDENTITY + " value" +
+ " to a boolean: must be a String");
+ }
+ }
}
}
@@ -610,7 +734,6 @@
log.debug(this + ": pingPeriod: " + this.pingPeriod);
log.debug(this + ": pingTimeout: " + this.pingTimeout);
log.debug(this + ": ping retries: " + configMap.get("NumberOfCallRetries"));
- locator = client.getInvoker().getLocator();
try
{
@@ -628,7 +751,14 @@
clientInvoker.connect();
}
- TimerUtil.schedule(this, pingPeriod);
+ try
+ {
+ TimerUtil.schedule(this, pingPeriod);
+ }
+ catch (Exception e)
+ {
+ log.error(this + " unable to schedule on TimerUtil", e);
+ }
started = true;
timer = new Timer(true);
log.debug(this + " started");
@@ -696,6 +826,7 @@
private boolean doStop()
{
+ if (trace) log.trace("entering doStop()");
synchronized(lock)
{
if (stopped)
@@ -707,6 +838,7 @@
{
listeners.clear();
}
+ stopping = true;
stopped = true;
timer = null;
}
@@ -722,34 +854,7 @@
log.debug(this + " stopped, returning " + result);
return result;
}
-
- private void notifyListeners(Throwable thr)
- {
- final Throwable t = thr;
- synchronized (lock)
- {
- if (stopped)
- {
- return;
- }
- ListIterator itr = listeners.listIterator();
- while (itr.hasNext())
- {
- final ConnectionListener listener = (ConnectionListener) itr.next();
- new Thread()
- {
- public void run()
- {
- log.debug(this + " calling " + listener + ".handleConnectionException()");
- listener.handleConnectionException(t, client);
- }
- }.start();
- }
- }
- stop();
- listeners.clear();
- }
-
+
// Inner classes --------------------------------------------------------------------------------
private class WaitOnConnectionCheckTimerTask extends TimerTask
@@ -783,27 +888,30 @@
if (!isValid)
{
log.debug(ConnectionValidator.this + "'s connection is invalid");
-
- notifyListeners(new Exception("Could not connect to server!"));
+ ConnectionValidator.super.cancel();
if (stopLeaseOnFailure)
{
- log.debug(this + " detected connection failure: stopping LeasePinger");
- MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker) client.getInvoker();
-
- if (invoker != null)
+ if (trace) log.trace(ConnectionValidator.this + " detected connection failure: stopping LeasePinger");
+ if (leasePinger != null)
{
+ log.debug(ConnectionValidator.this + " shutting down lease pinger: " + leasePinger);
int disconnectTimeout = (failureDisconnectTimeout == -1) ? client.getDisconnectTimeout() : failureDisconnectTimeout;
- invoker.terminateLease(null, disconnectTimeout);
- log.debug(ConnectionValidator.this + " shut down lease pinger");
+ if (trace) log.trace(ConnectionValidator.this + " disconnectTimeout: " + disconnectTimeout);
+ sharedInvoker.terminateLease(null, disconnectTimeout, leasePinger);
}
else
{
- log.debug(ConnectionValidator.this + " unable to shut down lease pinger: client must have shut down");
+ if (trace) log.trace(ConnectionValidator.this + ": lease pinger == null: perhaps leasing is not enabled for this connection");
+ notifyListeners(new Exception("Could not connect to server!"));
}
cancel();
}
+ if (!useClientConnectionIdentity)
+ {
+ notifyListeners(new Exception("Could not connect to server!"));
+ }
}
}
}
More information about the jboss-remoting-commits
mailing list