Author: ron.sigal(a)jboss.com
Date: 2008-06-09 21:43:37 -0400 (Mon, 09 Jun 2008)
New Revision: 4277
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java
Log:
JBREM-949: Added a Timer so checking for invocation response is made on a separate thread;
JBREM-860: eliminated "numberOfRetries" variable, which is no longer used in
socket transport.
Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java 2008-06-08
23:43:55 UTC (rev 4276)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/ConnectionValidator.java 2008-06-10
01:43:37 UTC (rev 4277)
@@ -32,6 +32,7 @@
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.Timer;
import java.util.TimerTask;
/**
@@ -61,12 +62,6 @@
* Currently implemented only on socket transport family.
*/
public static final String DEFAULT_NUMBER_OF_PING_RETRIES = "1";
-
- /**
- * Default number of connection acquisition retries. Value is 1.
- * Currently implemented only on socket transport family.
- */
- public static final String DEFAULT_NUMBER_OF_CONNECTION_RETRIES = "1";
/**
* Key to determine if ConnectionValidator should tie failure to presence
@@ -95,6 +90,7 @@
{
boolean pingWorked = false;
Map configMap = createPingConfig(config, null);
+ int pingTimeout = Integer.parseInt((String) configMap.get(ServerInvoker.TIMEOUT));
ClientInvoker innerClientInvoker = null;
try
@@ -107,7 +103,7 @@
innerClientInvoker.connect();
}
- pingWorked = doCheckConnection(innerClientInvoker);
+ pingWorked = doCheckConnection(innerClientInvoker, pingTimeout);
}
catch (Throwable throwable)
{
@@ -127,7 +123,7 @@
return pingWorked;
}
- private static boolean doCheckConnection(ClientInvoker clientInvoker) throws
Throwable
+ private static boolean doCheckConnection(ClientInvoker clientInvoker, int pingTimeout)
throws Throwable
{
boolean pingWorked = false;
@@ -136,16 +132,12 @@
// Sending null client id as don't want to trigger lease on server side.
This also means
// that client connection validator will NOT impact client lease, so can not
depend on it
// to maintain client lease with the server.
- InvocationRequest ir =
- new InvocationRequest(null, Subsystem.SELF, "$PING$", null, null,
null);
-
- if (trace) { log.trace("pinging, sending " + ir + " over " +
clientInvoker); }
-
- clientInvoker.invoke(ir);
-
- if (trace) { log.trace("ConnectionValidator got successful ping using
" + clientInvoker);}
-
- pingWorked = true;
+ InvocationRequest ir;
+ ir = new InvocationRequest(null, Subsystem.SELF, "$PING$", null, null,
null);
+ ConnectionCheckThread t = new ConnectionCheckThread(clientInvoker, ir);
+ t.start();
+ Thread.sleep(pingTimeout);
+ pingWorked = t.isValid();
}
catch (Throwable t)
{
@@ -165,7 +157,18 @@
Object o = config.get(VALIDATOR_PING_TIMEOUT);
log.trace("config timeout: " + o);
if (o != null)
- localConfig.put(ServerInvoker.TIMEOUT, o);
+ {
+ try
+ {
+ Integer.parseInt((String) o);
+ localConfig.put(ServerInvoker.TIMEOUT, o);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Need integer for value of parameter " +
VALIDATOR_PING_TIMEOUT +
+ ". Using default value " + DEFAULT_PING_TIMEOUT);
+ }
+ }
o = config.get("NumberOfCallRetries");
if (o != null)
@@ -182,7 +185,18 @@
localConfig.putAll(metadata);
Object o = metadata.get(VALIDATOR_PING_TIMEOUT);
if (o != null)
- localConfig.put(ServerInvoker.TIMEOUT, o);
+ {
+ try
+ {
+ Integer.parseInt((String) o);
+ localConfig.put(ServerInvoker.TIMEOUT, o);
+ }
+ catch (NumberFormatException e)
+ {
+ log.warn("Need integer for value of parameter " +
VALIDATOR_PING_TIMEOUT +
+ ". Using default value " + DEFAULT_PING_TIMEOUT);
+ }
+ }
}
if (localConfig.get(ServerInvoker.TIMEOUT) == null)
@@ -191,9 +205,6 @@
if (localConfig.get("NumberOfCallRetries") == null)
localConfig.put("NumberOfCallRetries",
DEFAULT_NUMBER_OF_PING_RETRIES);
- if (localConfig.get("NumberOfRetries") == null)
- localConfig.put("NumberOfRetries",
DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
-
return localConfig;
}
@@ -207,10 +218,14 @@
private List listeners;
private ClientInvoker clientInvoker;
private Object lock = new Object();
+ private Object notificationLock = new Object();
private volatile boolean stopped;
private String invokerSessionId;
private boolean tieToLease = true;
private boolean stopLeaseOnFailure = true;
+ private int pingTimeout;
+ private boolean isValid;
+ private Timer timer;
// Constructors
---------------------------------------------------------------------------------
@@ -259,15 +274,26 @@
*/
public void run()
{
- synchronized(lock)
+ TimerTask tt = new WaitOnConnectionCheckTimerTask();
+
+ try
{
- if(!stopped)
+ timer.schedule(tt, 0);
+ }
+ catch (IllegalStateException e)
+ {
+ log.debug("Unable to schedule TimerTask on existing Timer", e);
+ timer = new Timer(true);
+ timer.schedule(tt, 0);
+ }
+
+ try
+ {
+ synchronized(lock)
{
- try
+ if(!stopped)
{
- if (trace) { log.trace(this + " pinging ..."); }
-
- boolean isValid = false;
+ isValid = false;
if (tieToLease && client.getLeasePeriod() > 0)
{
@@ -276,37 +302,29 @@
}
else
{
- isValid = doCheckConnection(clientInvoker);
+ if (trace) { log.trace(this + " pinging ..."); }
+ isValid = doCheckConnectionWithoutLease();
}
-
- if (!isValid)
- {
- log.debug(this + "'s connections is invalid");
-
- notifyListeners(new Exception("Could not connect to
server!"));
-
- if (stopLeaseOnFailure)
- {
- log.debug(this + " detected connection failure: stopping
LeasePinger");
- MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker)
client.getInvoker();
- invoker.terminateLease(null, client.getDisconnectTimeout());
- log.debug(this + " shut down lease pinger");
- }
- }
}
- catch (Throwable thr)
- {
- log.debug(this + " got throwable while pinging", thr);
- notifyListeners(thr);
-
- if (stopLeaseOnFailure)
- {
- log.debug(this + " detected connection failure: stopping");
- cancel();
- }
- }
}
}
+ catch (Throwable thr)
+ {
+ log.debug(this + " got throwable while pinging", thr);
+
+ if (stopLeaseOnFailure)
+ {
+ log.debug(this + " detected connection failure: stopping");
+ cancel();
+ }
+ }
+ finally
+ {
+ synchronized (notificationLock)
+ {
+ notificationLock.notifyAll();
+ }
+ }
}
public boolean cancel()
@@ -460,7 +478,8 @@
private void start()
{
configMap = createPingConfig(client.getConfiguration(), metadata);
- log.debug(this + " timeout: " + configMap.get(ServerInvoker.TIMEOUT));
+ pingTimeout = Integer.parseInt((String) configMap.get(ServerInvoker.TIMEOUT));
+ log.debug(this + " timeout: " + pingTimeout);
log.debug(this + " ping retries: " +
configMap.get("NumberOfCallRetries"));
log.debug(this + " connection retries: " +
configMap.get("NumberOfRetries"));
locator = client.getInvoker().getLocator();
@@ -471,7 +490,7 @@
}
catch (Exception e)
{
- log.error("Unable to create client invoker for locator: " + locator);
+ log.debug("Unable to create client invoker for locator: " + locator);
throw new RuntimeException("Unable to create client invoker for locator:
" + locator, e);
}
@@ -483,7 +502,7 @@
TimerUtil.schedule(this, pingPeriod);
stopped = false;
-
+ timer = new Timer(true);
log.debug(this + " started");
}
@@ -518,7 +537,35 @@
return pingWorked;
}
+
+ private boolean doCheckConnectionWithoutLease() throws Throwable
+ {
+ boolean pingWorked = false;
+ try
+ {
+ // Sending null client id as don't want to trigger lease on server side.
This also means
+ // that client connection validator will NOT impact client lease, so can not
depend on it
+ // to maintain client lease with the server.
+ InvocationRequest ir =
+ new InvocationRequest(null, Subsystem.SELF, "$PING$", null, null,
null);
+
+ if (trace) { log.trace("pinging, sending " + ir + " over " +
clientInvoker); }
+
+ clientInvoker.invoke(ir);
+
+ if (trace) { log.trace("ConnectionValidator got successful ping using
" + clientInvoker);}
+
+ pingWorked = true;
+ }
+ catch (Throwable t)
+ {
+ log.debug("ConnectionValidator failed to ping via " + clientInvoker,
t);
+ }
+
+ return pingWorked;
+ }
+
private boolean doStop()
{
synchronized(lock)
@@ -569,4 +616,89 @@
// Inner classes
--------------------------------------------------------------------------------
+ private class WaitOnConnectionCheckTimerTask extends TimerTask
+ {
+ public void run()
+ {
+ long start = System.currentTimeMillis();
+ synchronized (notificationLock)
+ {
+ while (true)
+ {
+ int elapsed = (int) (System.currentTimeMillis() - start);
+ int wait = pingTimeout - elapsed;
+ if (wait <= 0) break;
+
+ try
+ {
+ notificationLock.wait(wait);
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ continue;
+ }
+ }
+ }
+
+ if (!isValid)
+ {
+ log.debug(ConnectionValidator.this + "'s connections is
invalid");
+
+ notifyListeners(new Exception("Could not connect to server!"));
+
+ if (stopLeaseOnFailure)
+ {
+ log.debug(this + " detected connection failure: stopping
LeasePinger");
+ MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker)
client.getInvoker();
+
+ if (invoker != null)
+ {
+ invoker.terminateLease(null, client.getDisconnectTimeout());
+ log.debug(ConnectionValidator.this + " shut down lease
pinger");
+ }
+ else
+ {
+ log.debug(ConnectionValidator.this + " unable to shut down lease
pinger: client must have shut down");
+ }
+
+ cancel();
+ }
+ }
+ }
+ }
+
+ private static class ConnectionCheckThread extends Thread
+ {
+ private InvocationRequest ir;
+ private ClientInvoker clientInvoker;
+ private boolean isValid;
+
+ public ConnectionCheckThread(ClientInvoker clientInvoker, InvocationRequest ir)
+ {
+ this.clientInvoker = clientInvoker;
+ this.ir = ir;
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ try
+ {
+ if (trace) { log.trace("pinging, sending " + ir + " over
" + clientInvoker); }
+ clientInvoker.invoke(ir);
+ isValid = true;
+ if (trace) { log.trace("ConnectionValidator got successful ping using
" + clientInvoker);}
+ }
+ catch (Throwable t)
+ {
+ log.debug("ConnectionValidator failed to ping via " +
clientInvoker, t);
+ }
+ }
+
+ public boolean isValid()
+ {
+ return isValid;
+ }
+ }
}
\ No newline at end of file